Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Data_stream
    • Equity_penetration
  • update_nebula

update_nebula · Changes

Page history
update: nebula增量例行 authored Jan 07, 2022 by 李子健's avatar 李子健
Hide whitespace changes
Inline Side-by-side
Showing with 89 additions and 26 deletions
+89 -26
  • data_stream/equity_penetration/update_nebula.md data_stream/equity_penetration/update_nebula.md +89 -26
  • No files found.
data_stream/equity_penetration/update_nebula.md
View page @ 94f880b2
......@@ -5,6 +5,7 @@
## tag_firm
### 逻辑
```plantuml
@startuml
database nebula的tag_firm
......@@ -26,6 +27,7 @@ kafka --> 中间表nebula_tag_firm: 例行入表
融合库 --> 中间表nebula_tag_firm: 存量入表后例行
@enduml
```
```
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula
```
......@@ -108,7 +110,7 @@ CREATE TABLE `nebula_tag_firm` (
#### tb_company
```
通过binlog更新
先存量入表,后续通过binlog更新
```
| tb_company | nebula_tag_company | 备注 |
......@@ -153,6 +155,7 @@ CREATE TABLE `nebula_tag_firm` (
## tag_person
### 逻辑
```plantuml
@startuml
database nebula的tag_person
......@@ -176,6 +179,7 @@ tb_person --> 中间表nebula_tag_person: 存量入表后例行
中间表nebula_tag_person --> nebula的tag_person: 存量入表后例行
@enduml
```
```
从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。
```
......@@ -223,7 +227,7 @@ CREATE TABLE `nebula_tag_person` (
#### tb_person
```
通过binlog更新
先存量入表,后续通过binlog更新
```
| tb_person | nebula_tag_person | 备注 |
......@@ -236,7 +240,7 @@ CREATE TABLE `nebula_tag_person` (
#### tb_company_employee
```
通过binlog更新is_history=0
先存量入表,后续通过binlog更新is_history=0
```
| tb_company_employee | nebula_tag_person | 备注 |
......@@ -247,7 +251,7 @@ CREATE TABLE `nebula_tag_person` (
#### tb_company_partner
```
通过binlog更新is_history=0
先存量入表,后续通过binlog更新is_history=0
```
| tb_company_partner | nebula_tag_person | 备注 |
......@@ -258,7 +262,7 @@ CREATE TABLE `nebula_tag_person` (
#### tb_company_legalperson
```
通过binlog更新is_history=0
先存量入表,后续通过binlog更新is_history=0
```
| tb_company_legalperson | nebula_tag_person | 备注 |
......@@ -269,6 +273,7 @@ CREATE TABLE `nebula_tag_person` (
## edge_serve
### 逻辑
```plantuml
@startuml
database nebula的edge_serve
......@@ -290,6 +295,7 @@ kafka --> 中间表nebula_edge_serve: 例行入表
融合库 --> 中间表nebula_edge_serve: 存量入表后例行
@enduml
```
```
从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
```
......@@ -332,7 +338,7 @@ CREATE TABLE `nebula_edge_serve` (
#### tb_company_enployee
```
通过binlog更新
先存量入表,后续通过binlog更新
```
| tb_company_enployee | nebula_edge_serve | 备注 |
......@@ -359,6 +365,7 @@ CREATE TABLE `nebula_edge_serve` (
## edge_invest_h
### 逻辑
```plantuml
@startuml
database nebula的edge_invest_h
......@@ -380,6 +387,7 @@ kafka --> 中间表nebula_edge_invest_h: 例行入表
融合库 --> 中间表nebula_edge_invest_h: 存量入表后例行
@enduml
```
```
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
```
......@@ -425,7 +433,7 @@ CREATE TABLE `nebula_edge_invest_h` (
#### tb_company_partner
```
通过binlog更新
先存量入表,后续通过binlog更新
```
| tb_company_partner | nebula_edge_invest_h | 备注 |
......@@ -454,6 +462,7 @@ CREATE TABLE `nebula_edge_invest_h` (
## edge_invest_c
### 逻辑
```plantuml
@startuml
database nebula的edge_invest_c
......@@ -475,6 +484,7 @@ kafka --> 中间表nebula_edge_invest_c: 例行入表
融合库 --> 中间表nebula_edge_invest_c: 存量入表后例行
@enduml
```
```
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
```
......@@ -519,7 +529,7 @@ CREATE TABLE `nebula_edge_invest_c` (
#### tb_company_partner
```
通过binlog更新
先存量入表,后续通过binlog更新
```
| tb_company_partner | nebula_edge_invest_h | 备注 |
......@@ -547,24 +557,28 @@ CREATE TABLE `nebula_edge_invest_c` (
## edge_own
### 逻辑
```plantuml
@startuml
database nebula的edge_own
file 爬虫数据
database 融合库
database 中间表
爬虫数据 --> 融合库
融合库 --> nebula的edge_own: binlog更新
融合库 --> 中间表: binlog更新
中间表 --> nebula的edge_own: binlog更新
@enduml
```
```
直接通过监控融合库tb_company_legalperson表的binlog更新
先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula
```
### nebula结构
```sql
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<ppid, edge_own, (Rank), fid>唯一标识,表名为edge_own
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<pid, edge_own, (Rank), fid>唯一标识,表名为edge_own
CREATE EDGE `edge_own` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
......@@ -572,6 +586,24 @@ CREATE EDGE `edge_own` (
) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge";
```
### 中间表表结构
```sql
CREATE TABLE `nebula_edge_own` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`pid` varchar(50) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
`fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_id` (`company_name_digest`, `ppid`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_own的数据中间表';
```
### 涉及到的数据源
#### tb_company_legalperson
......@@ -580,33 +612,39 @@ CREATE EDGE `edge_own` (
通过binlog更新,lp_ppid is not null and company_name_digest is not null
```
| tb_company_legalperson | 备注 |
| ---------------------- | ---- |
| lp_ppid | |
| company_name_digest | |
| is_history | |
| update_time | |
| tb_company_legalperson | nebula_edge_own | 备注 |
| ---------------------- | ------------------- | --------------------------- |
| lp_ppid | ppid | |
| company_name_digest | company_name_digest | |
| is_history | is_history | |
| lp_ppid | pid | pid='p'+lp_ppid |
| company_name_digest | fid | fid='f'+company_name_digest |
## edge_own_c
### 逻辑
```plantuml
@startuml
database nebula的edge_own_c
file 爬虫数据
database 融合库
database 中间表
爬虫数据 --> 融合库
融合库 --> nebula的edge_own_c: binlog更新
融合库 --> 中间表: binlog更新
中间表 --> nebula的edge_own_c: binlog更新
@enduml
```
```
直接通过监控融合库tb_company_legalperson表的binlog更新
先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula
```
### nebula结构
```sql
企业作为企业法定代表人,与企业间关系的集合,类型为edge,用<s_fid,edge_own_c,(Rank),e_fid>唯一标识,表名为edge_own_c
CREATE EDGE `edge_own_c` (
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id",
......@@ -614,6 +652,24 @@ CREATE EDGE `edge_own_c` (
) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge";
```
### 中间表表结构
```sql
CREATE TABLE `nebula_edge_own_c` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和lp_company_name_digest拼接,对应nebula中的s_fid',
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
`lp_company_name_digest` char(32) DEFAULT NULL COMMENT '法人企业唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_ow_c的数据中间表';
```
### 涉及到的数据源
#### tb_company_legalperson
......@@ -622,16 +678,18 @@ CREATE EDGE `edge_own_c` (
通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null
```
| tb_company_legalperson | 备注 |
| ---------------------- | ---- |
| lp_company_name_digest | |
| company_name_digest | |
| is_history | |
| update_time | |
| tb_company_legalperson | nebula_edge_own_c | 备注 |
| ---------------------- | ---------------------- | -------------------------------- |
| lp_company_name_digest | lp_company_name_digest | |
| company_name_digest | company_name_digest | |
| is_history | is_history | |
| lp_company_name_digest | s_fid | s_fid='f'+lp_company_name_digest |
| company_name_digest | e_fid | e_fid='f'+company_name_digest |
## edge_branch
### 逻辑
```plantuml
@startuml
database nebula的edge_branch
......@@ -649,8 +707,9 @@ utn_ic.company_branch --> kafka: 增量写kafka
kafka --> 中间表nebula_edge_branch: 例行入表
@enduml
```
```
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表,监控中间表的binlog更新nebula
```
### nebula结构
......@@ -686,6 +745,10 @@ CREATE TABLE `nebula_edge_branch` (
#### utn_ic.company_branch
```
存量入表后,通过kafka增量入表
```
| company_branch | nebula_edge_branch | 备注 |
| ------------------- | -------------------------- | ------------------------------------------------------------ |
| branch_name_digest | branch_company_name_digest | |
......
Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages