... | ... | @@ -5,6 +5,7 @@ |
|
|
## tag_firm
|
|
|
|
|
|
### 逻辑
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
database nebula的tag_firm
|
... | ... | @@ -26,6 +27,7 @@ kafka --> 中间表nebula_tag_firm: 例行入表 |
|
|
融合库 --> 中间表nebula_tag_firm: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
```
|
|
|
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula
|
|
|
```
|
... | ... | @@ -108,7 +110,7 @@ CREATE TABLE `nebula_tag_firm` ( |
|
|
#### tb_company
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
先存量入表,后续通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_company | nebula_tag_company | 备注 |
|
... | ... | @@ -153,6 +155,7 @@ CREATE TABLE `nebula_tag_firm` ( |
|
|
## tag_person
|
|
|
|
|
|
### 逻辑
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
database nebula的tag_person
|
... | ... | @@ -176,6 +179,7 @@ tb_person --> 中间表nebula_tag_person: 存量入表后例行 |
|
|
中间表nebula_tag_person --> nebula的tag_person: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
```
|
|
|
从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。
|
|
|
```
|
... | ... | @@ -223,7 +227,7 @@ CREATE TABLE `nebula_tag_person` ( |
|
|
#### tb_person
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
先存量入表,后续通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_person | nebula_tag_person | 备注 |
|
... | ... | @@ -236,7 +240,7 @@ CREATE TABLE `nebula_tag_person` ( |
|
|
#### tb_company_employee
|
|
|
|
|
|
```
|
|
|
通过binlog更新is_history=0
|
|
|
先存量入表,后续通过binlog更新is_history=0
|
|
|
```
|
|
|
|
|
|
| tb_company_employee | nebula_tag_person | 备注 |
|
... | ... | @@ -247,7 +251,7 @@ CREATE TABLE `nebula_tag_person` ( |
|
|
#### tb_company_partner
|
|
|
|
|
|
```
|
|
|
通过binlog更新is_history=0
|
|
|
先存量入表,后续通过binlog更新is_history=0
|
|
|
```
|
|
|
|
|
|
| tb_company_partner | nebula_tag_person | 备注 |
|
... | ... | @@ -258,7 +262,7 @@ CREATE TABLE `nebula_tag_person` ( |
|
|
#### tb_company_legalperson
|
|
|
|
|
|
```
|
|
|
通过binlog更新is_history=0
|
|
|
先存量入表,后续通过binlog更新is_history=0
|
|
|
```
|
|
|
|
|
|
| tb_company_legalperson | nebula_tag_person | 备注 |
|
... | ... | @@ -269,6 +273,7 @@ CREATE TABLE `nebula_tag_person` ( |
|
|
## edge_serve
|
|
|
|
|
|
### 逻辑
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
database nebula的edge_serve
|
... | ... | @@ -290,6 +295,7 @@ kafka --> 中间表nebula_edge_serve: 例行入表 |
|
|
融合库 --> 中间表nebula_edge_serve: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
```
|
|
|
从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
|
|
|
```
|
... | ... | @@ -332,7 +338,7 @@ CREATE TABLE `nebula_edge_serve` ( |
|
|
#### tb_company_enployee
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
先存量入表,后续通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_company_enployee | nebula_edge_serve | 备注 |
|
... | ... | @@ -359,6 +365,7 @@ CREATE TABLE `nebula_edge_serve` ( |
|
|
## edge_invest_h
|
|
|
|
|
|
### 逻辑
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
database nebula的edge_invest_h
|
... | ... | @@ -380,6 +387,7 @@ kafka --> 中间表nebula_edge_invest_h: 例行入表 |
|
|
融合库 --> 中间表nebula_edge_invest_h: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
```
|
|
|
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
|
|
|
```
|
... | ... | @@ -425,7 +433,7 @@ CREATE TABLE `nebula_edge_invest_h` ( |
|
|
#### tb_company_partner
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
先存量入表,后续通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_company_partner | nebula_edge_invest_h | 备注 |
|
... | ... | @@ -454,6 +462,7 @@ CREATE TABLE `nebula_edge_invest_h` ( |
|
|
## edge_invest_c
|
|
|
|
|
|
### 逻辑
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
database nebula的edge_invest_c
|
... | ... | @@ -475,6 +484,7 @@ kafka --> 中间表nebula_edge_invest_c: 例行入表 |
|
|
融合库 --> 中间表nebula_edge_invest_c: 存量入表后例行
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
```
|
|
|
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
|
|
|
```
|
... | ... | @@ -519,7 +529,7 @@ CREATE TABLE `nebula_edge_invest_c` ( |
|
|
#### tb_company_partner
|
|
|
|
|
|
```
|
|
|
通过binlog更新
|
|
|
先存量入表,后续通过binlog更新
|
|
|
```
|
|
|
|
|
|
| tb_company_partner | nebula_edge_invest_h | 备注 |
|
... | ... | @@ -547,24 +557,28 @@ CREATE TABLE `nebula_edge_invest_c` ( |
|
|
## edge_own
|
|
|
|
|
|
### 逻辑
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
database nebula的edge_own
|
|
|
file 爬虫数据
|
|
|
database 融合库
|
|
|
database 中间表
|
|
|
|
|
|
爬虫数据 --> 融合库
|
|
|
融合库 --> nebula的edge_own: binlog更新
|
|
|
融合库 --> 中间表: binlog更新
|
|
|
中间表 --> nebula的edge_own: binlog更新
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
```
|
|
|
直接通过监控融合库tb_company_legalperson表的binlog更新
|
|
|
先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<ppid, edge_own, (Rank), fid>唯一标识,表名为edge_own
|
|
|
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<pid, edge_own, (Rank), fid>唯一标识,表名为edge_own
|
|
|
CREATE EDGE `edge_own` (
|
|
|
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
|
... | ... | @@ -572,6 +586,24 @@ CREATE EDGE `edge_own` ( |
|
|
) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge";
|
|
|
```
|
|
|
|
|
|
### 中间表表结构
|
|
|
|
|
|
```sql
|
|
|
CREATE TABLE `nebula_edge_own` (
|
|
|
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
|
|
|
`pid` varchar(50) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
|
|
|
`fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
|
|
|
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
|
|
|
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
|
|
|
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
|
|
|
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `idx_name_id` (`company_name_digest`, `ppid`),
|
|
|
KEY `idx_update_time` (`update_time`)
|
|
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_own的数据中间表';
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_company_legalperson
|
... | ... | @@ -580,33 +612,39 @@ CREATE EDGE `edge_own` ( |
|
|
通过binlog更新,lp_ppid is not null and company_name_digest is not null
|
|
|
```
|
|
|
|
|
|
| tb_company_legalperson | 备注 |
|
|
|
| ---------------------- | ---- |
|
|
|
| lp_ppid | |
|
|
|
| company_name_digest | |
|
|
|
| is_history | |
|
|
|
| update_time | |
|
|
|
| tb_company_legalperson | nebula_edge_own | 备注 |
|
|
|
| ---------------------- | ------------------- | --------------------------- |
|
|
|
| lp_ppid | ppid | |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| is_history | is_history | |
|
|
|
| lp_ppid | pid | pid='p'+lp_ppid |
|
|
|
| company_name_digest | fid | fid='f'+company_name_digest |
|
|
|
|
|
|
## edge_own_c
|
|
|
|
|
|
### 逻辑
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
database nebula的edge_own_c
|
|
|
file 爬虫数据
|
|
|
database 融合库
|
|
|
database 中间表
|
|
|
|
|
|
爬虫数据 --> 融合库
|
|
|
融合库 --> nebula的edge_own_c: binlog更新
|
|
|
融合库 --> 中间表: binlog更新
|
|
|
中间表 --> nebula的edge_own_c: binlog更新
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
```
|
|
|
直接通过监控融合库tb_company_legalperson表的binlog更新
|
|
|
先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
|
|
|
|
|
```sql
|
|
|
企业作为企业法定代表人,与企业间关系的集合,类型为edge,用<s_fid,edge_own_c,(Rank),e_fid>唯一标识,表名为edge_own_c
|
|
|
CREATE EDGE `edge_own_c` (
|
|
|
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
|
|
|
`lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id",
|
... | ... | @@ -614,6 +652,24 @@ CREATE EDGE `edge_own_c` ( |
|
|
) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge";
|
|
|
```
|
|
|
|
|
|
### 中间表表结构
|
|
|
|
|
|
```sql
|
|
|
CREATE TABLE `nebula_edge_own_c` (
|
|
|
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
|
|
|
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和lp_company_name_digest拼接,对应nebula中的s_fid',
|
|
|
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
|
|
|
`lp_company_name_digest` char(32) DEFAULT NULL COMMENT '法人企业唯一id',
|
|
|
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
|
|
|
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
|
|
|
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
|
|
|
KEY `idx_update_time` (`update_time`)
|
|
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_ow_c的数据中间表';
|
|
|
```
|
|
|
|
|
|
### 涉及到的数据源
|
|
|
|
|
|
#### tb_company_legalperson
|
... | ... | @@ -622,16 +678,18 @@ CREATE EDGE `edge_own_c` ( |
|
|
通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null
|
|
|
```
|
|
|
|
|
|
| tb_company_legalperson | 备注 |
|
|
|
| ---------------------- | ---- |
|
|
|
| lp_company_name_digest | |
|
|
|
| company_name_digest | |
|
|
|
| is_history | |
|
|
|
| update_time | |
|
|
|
| tb_company_legalperson | nebula_edge_own_c | 备注 |
|
|
|
| ---------------------- | ---------------------- | -------------------------------- |
|
|
|
| lp_company_name_digest | lp_company_name_digest | |
|
|
|
| company_name_digest | company_name_digest | |
|
|
|
| is_history | is_history | |
|
|
|
| lp_company_name_digest | s_fid | s_fid='f'+lp_company_name_digest |
|
|
|
| company_name_digest | e_fid | e_fid='f'+company_name_digest |
|
|
|
|
|
|
## edge_branch
|
|
|
|
|
|
### 逻辑
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
database nebula的edge_branch
|
... | ... | @@ -649,8 +707,9 @@ utn_ic.company_branch --> kafka: 增量写kafka |
|
|
kafka --> 中间表nebula_edge_branch: 例行入表
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
```
|
|
|
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表
|
|
|
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表,监控中间表的binlog更新nebula
|
|
|
```
|
|
|
|
|
|
### nebula结构
|
... | ... | @@ -686,6 +745,10 @@ CREATE TABLE `nebula_edge_branch` ( |
|
|
|
|
|
#### utn_ic.company_branch
|
|
|
|
|
|
```
|
|
|
存量入表后,通过kafka增量入表
|
|
|
```
|
|
|
|
|
|
| company_branch | nebula_edge_branch | 备注 |
|
|
|
| ------------------- | -------------------------- | ------------------------------------------------------------ |
|
|
|
| branch_name_digest | branch_company_name_digest | |
|
... | ... | |