nebula增量例行方案
提前准备好数据,建立中间表,mysql和nebula表对表更新。
tag_firm
逻辑
@startuml database nebula的tag_firm file 爬虫数据 database 中间表nebula_tag_firm queue kafka file data_pump database utn_ic.ic database mongo database 融合库
mongo --> utn_ic.ic 爬虫数据 --> 融合库 utn_ic.ic --> data_pump: 存量 data_pump --> 中间表nebula_tag_firm: 存量入表 中间表nebula_tag_firm --> nebula的tag_firm: binlog更新 utn_ic.ic --> kafka: 增量写kafka kafka --> 中间表nebula_tag_firm: 例行入表 融合库 --> 中间表nebula_tag_firm: 存量入表后例行 @enduml
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula
nebula结构
企业实体的结合,类型为tag,用fid唯一标识vid(f-company_name_digest),属性包含营业执照信息,表名tag_firm
CREATE TAG `tag_firm` (
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id(工商digest)",
`company_name` string NULL COMMENT "企业姓名",
`credit_no` string NULL COMMENT "统一信用代码",
`company_code` string NULL COMMENT "企业注册号",
`org_code` string NULL COMMENT "组织机构代码",
`tax_code` string NULL COMMENT "纳税人识别号",
`capital` string NULL COMMENT "注册资本",
`real_capital` string NULL COMMENT "实缴资本",
`establish_date` datetime NULL COMMENT "成立日期",
`issue_date` datetime NULL COMMENT "核准日期",
`revoke_date` datetime NULL COMMENT "注销或吊销日期",
`company_status` string NULL COMMENT "登记状态",
`company_type` string NULL COMMENT "企业类型",
`company_major_type` string NULL COMMENT "归类后的企业类型",
`n_company_status` string NULL COMMENT "登记状态(格式化)",
`operation_startdate` string NULL COMMENT "营业起始时间",
`operation_enddate` string NULL COMMENT "营业终止时间",
`authority` string NULL COMMENT "登记机关",
`company_address` string NULL COMMENT "注册地址",
`company_industry` string NULL COMMENT "所属行业",
`province_short` string NULL COMMENT "省份简称",
`city` string NULL COMMENT "城市",
`district` string NULL COMMENT "地区",
`insurance_amount` int NULL COMMENT "参保人数",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "企业实体tag";
中间表表结构
CREATE TABLE `nebula_tag_firm` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest的拼接,对应nebula中的vid,是否入nebula标志,为null时表示不入nebula',
`company_name_digest` char(32) DEFAULT NULL COMMENT 'company唯一键',
`company_name` varchar(255) DEFAULT NULL COMMENT ' 公司名称',
`credit_no` varchar(50) DEFAULT NULL COMMENT '统一社会信用码',
`company_code` varchar(50) DEFAULT NULL COMMENT '公司注册码',
`org_code` varchar(50) DEFAULT NULL COMMENT '组织机构代码',
`tax_code` varchar(50) DEFAULT NULL COMMENT '纳税人识别号',
`n_company_status` varchar(16) DEFAULT NULL COMMENT '登记状态(格式化)',
`capital` varchar(255) DEFAULT NULL COMMENT '注册资本',
`real_capital` varchar(255) DEFAULT NULL COMMENT '实缴资本',
`establish_date` datetime DEFAULT NULL COMMENT '成立日期',
`issue_date` datetime DEFAULT NULL COMMENT '核准日期',
`revoke_date` datetime DEFAULT NULL COMMENT '注销或吊销日期',
`company_status` varchar(50) DEFAULT NULL COMMENT '登记状态',
`company_type` varchar(255) DEFAULT NULL COMMENT '企业类型',
`operation_startdate` varchar(50) DEFAULT NULL COMMENT '营业期限起始时间',
`operation_enddate` varchar(50) DEFAULT NULL COMMENT '营业期限终止时间',
`company_address` varchar(512) DEFAULT NULL COMMENT '公司注册地址',
`authority` varchar(255) DEFAULT NULL COMMENT '登记机关',
`company_industry` varchar(255) DEFAULT NULL COMMENT '所属行业',
`province_short` varchar(32) DEFAULT NULL COMMENT '省份简称',
`city` varchar(32) DEFAULT NULL COMMENT '地区',
`district` varchar(32) DEFAULT NULL COMMENT '城市',
`insurance_amount` int DEFAULT NULL COMMENT '参保人数',
`is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`),
KEY `idx_company_name` (`company_name`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_firm的数据中间表';
涉及到的数据源
tb_company
通过binlog更新
tb_company | nebula_tag_company | 备注 |
---|---|---|
company_name_digest | company_name_digest | |
company_name_digest | fid | fid='f'+company_name_digest |
utn_ic.ic
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.ic and deleted=0
utn_ic.ic | nebula_tag_company | 备注 |
---|---|---|
company_name_digest | company_name_digest | |
company_name | company_name | |
company_code | company_code | |
credit_no | credit_no | |
org_code | org_code | |
tax_code | tax_code | |
establish_date | establish_date | |
company_status | company_status | |
company_type | company_type | |
authority | authority | |
issue_date | issue_date | |
operation_startdate | operation_startdate | |
operation_enddate | operation_enddate | |
capital | capital | |
real_capital | real_capital | |
company_address | company_address | |
cancel_date | revoke_date | 两个字段值合一为revoke_date |
revoke_date | revoke_date | 两个字段值合一为revoke_date |
n_company_status | n_company_status | |
province_short | province_short | |
city | city | |
district | district | |
company_major_type | company_major_type | |
industries | company_industry | 取最高一级行业划分 |
insurance_amount | insurance_amount |
tag_person
逻辑
@startuml database nebula的tag_person file 爬虫数据 database 中间表nebula_tag_person file tb_company_employee file tb_company_legalperson file tb_company_partner file tb_person database 融合库
爬虫数据 --> 融合库 融合库 --> tb_company_employee 融合库 --> tb_company_legalperson 融合库 --> tb_company_partner 融合库 --> tb_person tb_company_employee --> 中间表nebula_tag_person: count(pcid) as ac_employee_num,group by ppid tb_company_legalperson --> 中间表nebula_tag_person: count(pcid) as ac_legalperson_num,group by lp_ppid tb_company_partner --> 中间表nebula_tag_person: count(pcid) as ac_partner_num,group by partner_ppid tb_person --> 中间表nebula_tag_person: 存量入表后例行 中间表nebula_tag_person --> nebula的tag_person: 存量入表后例行 @enduml
从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。
nebula结构
人员的集合,类型为tag,用pid唯一标识vid(p-ppid),表名 tag_person
CREATE TAG `tag_person` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`person_name` string NULL COMMENT "人员姓名",
`ac_num` int64 NULL COMMENT "关联企业数",
`ac_legalperson_num` int64 NULL COMMENT "担任法定代表人数",
`ac_partner_num` int64 NULL COMMENT "对外投资数",
`ac_employee_num` int64 NULL COMMENT "任职数",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "人员实体tag";
中间表表结构
CREATE TABLE `nebula_tag_person` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`pid` char(33) NOT NULL COMMENT '=p和ppid的拼接,对应nebula中的vid',
`ppid` char(32) NOT NULL COMMENT '人员唯一id',
`person_name` varchar(50) DEFAULT NULL COMMENT '人员姓名',
`ac_legalperson_num` int(9) DEFAULT NULL COMMENT '担任法定代表人数',
`ac_num` int(9) DEFAULT NULL COMMENT '关联企业数',
`ac_partner_num` int(9) DEFAULT NULL COMMENT '对外投资数',
`ac_employee_num` int(9) DEFAULT NULL COMMENT '任职数',
`is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_ppid` (`ppid`),
KEY `idx_name` (`person_name`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_person的数据中间表';
涉及到的数据源
tb_person
通过binlog更新
tb_person | nebula_tag_person | 备注 |
---|---|---|
ppid | ppid、pid | pid='p'+ppid |
person_name | person_name | |
ac_num | ac_num | |
update_time | update_time |
tb_company_employee
通过binlog更新is_history=0
tb_company_employee | nebula_tag_person | 备注 |
---|---|---|
ppid | ppid | |
pcid | ac_employee_num | count(pcid) as ac_employee_num,group by ppid |
tb_company_partner
通过binlog更新is_history=0
tb_company_partner | nebula_tag_person | 备注 |
---|---|---|
partner_ppid | ppid | |
pcid | ac_partner_num | count(pcid) as ac_partner_num,group by partner_ppid |
tb_company_legalperson
通过binlog更新is_history=0
tb_company_legalperson | nebula_tag_person | 备注 |
---|---|---|
lp_ppid | ppid | |
pcid | ac_legalperson_num | count(pcid) as ac_legalperson_num,group by lp_ppid |
edge_serve
逻辑
@startuml database nebula的edge_serve file 爬虫数据 database 中间表nebula_edge_serve queue kafka file data_pump database utn_ic.company_employee database mongo database 融合库
mongo --> utn_ic.company_employee 爬虫数据 --> 融合库 utn_ic.company_employee --> data_pump: 存量 data_pump --> 中间表nebula_edge_serve: 存量入表 中间表nebula_edge_serve --> nebula的edge_serve: binlog更新 utn_ic.company_employee --> kafka: 增量写kafka kafka --> 中间表nebula_edge_serve: 例行入表 融合库 --> 中间表nebula_edge_serve: 存量入表后例行 @enduml
从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
企业主要人员与企业之间任职关系的集合,类型为edge,用<pid,edge_serve,(Rank),fid>唯一标识,边类型为edge_serve
CREATE EDGE `edge_serve` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`position` string NULL COMMENT "职务",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "任职edge";
中间表表结构
CREATE TABLE `nebula_edge_serve` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`employee_name` varchar(255) NOT NULL COMMENT '员工名称',
`position` varchar(64) DEFAULT NULL COMMENT '职务',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `employee_name`),
KEY `idx_position` (`position`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_serve的数据中间表';
涉及到的数据源
tb_company_enployee
通过binlog更新
tb_company_enployee | nebula_edge_serve | 备注 |
---|---|---|
ppid | ppid | |
company_name_digest | company_name_digest | |
employee_name | employee_name | |
company_name_digest | fid | fid='f'+company_name_digest |
ppid | pid | pid='p'+ppid |
utn_ic.company_employee
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee
utn_ic.company_employee | nebula_edge_serve | 备注 |
---|---|---|
company_name_digest | company_name_digest | |
employee_name | employee_name | |
position | position | |
is_history | is_history |
edge_invest_h
逻辑
@startuml database nebula的edge_invest_h file 爬虫数据 database 中间表nebula_edge_invest_h queue kafka file data_pump database utn_ic.company_partner_new database mongo database 融合库
mongo --> utn_ic.company_partner_new 爬虫数据 --> 融合库 utn_ic.company_partner_new --> data_pump: 存量 data_pump --> 中间表nebula_edge_invest_h: 存量入表 中间表nebula_edge_invest_h --> nebula的edge_invest_h: binlog更新 utn_ic.company_partner_new --> kafka: 增量写kafka kafka --> 中间表nebula_edge_invest_h: 例行入表 融合库 --> 中间表nebula_edge_invest_h: 存量入表后例行 @enduml
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
自然人作为企业股东,与企业间的参股关系的集合,类型为edge,用<ppid,edge_invest_h,(Rank),fid>唯一标识,边类型为edge_invest_h
CREATE EDGE `edge_invest_h` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`stock_proportion` string NULL COMMENT "持股比例",
`stock_capital` string NULL COMMENT "应缴金额",
`stock_realcapital` string NULL COMMENT "实缴金额",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "人员参股edge";
中间表表结构
CREATE TABLE `nebula_edge_invest_h` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
`partner_name` varchar(255) NOT NULL COMMENT '股东名称',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
`stock_proportion` double DEFAULT NULL COMMENT '持股比例',
`stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_id` (`company_name_digest`, `partner_name`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_h的数据中间表';
涉及到的数据源
tb_company_partner
通过binlog更新
tb_company_partner | nebula_edge_invest_h | 备注 |
---|---|---|
company_name_digest | company_name_digest | |
partner_ppid | ppid | |
company_name_digest | fid | fid='f'+company_name_digest |
partner_ppid | pid | pid='p'+ppid |
partner_name | partner_name |
utn_ic.company_partner_new
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 1
company_partner | nebula_edge_invest_h | 备注 |
---|---|---|
company_name_digest | company_name_digest | |
stock_name | partner_name | |
stock_capital | stock_capital | |
stock_proportion | stock_proportion | |
stock_realcapital | stock_realcapital | |
is_history | is_history |
edge_invest_c
逻辑
@startuml database nebula的edge_invest_c file 爬虫数据 database 中间表nebula_edge_invest_c queue kafka file data_pump database utn_ic.company_partner_new database mongo database 融合库
mongo --> utn_ic.company_partner_new 爬虫数据 --> 融合库 utn_ic.company_partner_new --> data_pump: 存量 data_pump --> 中间表nebula_edge_invest_c: 存量入表 中间表nebula_edge_invest_c --> nebula的edge_invest_c: binlog更新 utn_ic.company_partner_new --> kafka: 增量写kafka kafka --> 中间表nebula_edge_invest_c: 例行入表 融合库 --> 中间表nebula_edge_invest_c: 存量入表后例行 @enduml
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
企业作为企业股东,与企业间的参股关系的集合,类型为edge,用<s_fid,invest_c,(Rank),e_fid>唯一标识,表名为edge_invest_c
CREATE EDGE `edge_invest_c` (
`partner_company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`stock_proportion` string NULL COMMENT "持股比例",
`stock_capital` string NULL COMMENT "应缴金额",
`stock_realcapital` string NULL COMMENT "实缴金额",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "公司参股edge";
中间表表结构
CREATE TABLE `nebula_edge_invest_c` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和partner_company_name_digest拼接,对应nebula中的s_fid',
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
`partner_company_name_digest` char(32) DEFAULT NULL COMMENT '股东企业唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
`stock_proportion` double DEFAULT NULL COMMENT '持股比例',
`stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `partner_company_name_digest`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_c的数据中间表';
涉及到的数据源
tb_company_partner
通过binlog更新
tb_company_partner | nebula_edge_invest_h | 备注 |
---|---|---|
company_name_digest | company_name_digest | |
partner_company_name_digest | partner_company_name_digest | |
company_name_digest | e_fid | e_fid='f'+company_name_digest |
partner_company_name_digest | s_fid | s_fid='f'+partner_company_name_digest |
utn_ic.company_partner_new
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 0
company_partner | nebula_edge_invest_h | 备注 |
---|---|---|
company_name_digest | company_name_digest | |
stock_name_digest | partner_company_name_digest | |
stock_capital | stock_capital | |
stock_proportion | stock_proportion | |
stock_realcapital | stock_realcapital | |
is_history | is_history |
edge_own
逻辑
@startuml database nebula的edge_own file 爬虫数据 database 融合库
爬虫数据 --> 融合库 融合库 --> nebula的edge_own: binlog更新 @enduml
直接通过监控融合库tb_company_legalperson表的binlog更新
nebula结构
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<ppid, edge_own, (Rank), fid>唯一标识,表名为edge_own
CREATE EDGE `edge_own` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge";
涉及到的数据源
tb_company_legalperson
通过binlog更新,lp_ppid is not null and company_name_digest is not null
tb_company_legalperson | 备注 |
---|---|
lp_ppid | |
company_name_digest | |
is_history | |
update_time |
edge_own_c
逻辑
@startuml database nebula的edge_own_c file 爬虫数据 database 融合库
爬虫数据 --> 融合库 融合库 --> nebula的edge_own_c: binlog更新 @enduml
直接通过监控融合库tb_company_legalperson表的binlog更新
nebula结构
CREATE EDGE `edge_own_c` (
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge";
涉及到的数据源
tb_company_legalperson
通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null
tb_company_legalperson | 备注 |
---|---|
lp_company_name_digest | |
company_name_digest | |
is_history | |
update_time |
edge_branch
逻辑
@startuml database nebula的edge_branch database 中间表nebula_edge_branch queue kafka file data_pump database utn_ic.company_branch database mongo
mongo --> utn_ic.company_branch utn_ic.company_branch --> data_pump: 存量 data_pump --> 中间表nebula_edge_branch: 存量入表 中间表nebula_edge_branch --> nebula的edge_branch: binlog更新 utn_ic.company_branch --> kafka: 增量写kafka kafka --> 中间表nebula_edge_branch: 例行入表 @enduml
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表
nebula结构
企业与分支机构企业间关系的集合,类型为edge,用<s_fid, edge_branch, (Rank), e_fid>唯一标识,表名为edge_branch
CREATE EDGE `edge_branch` (
`company_name_digest` fixed_string(32) NOT NULL COMMENT "母公司企业唯一id",
`branch_company_name_digest` fixed_string(32) NOT NULL COMMENT "分支企业唯一id",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "公司分支机构edge";
中间表表结构
CREATE TABLE `nebula_edge_branch` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的s_fid',
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和branch_company_name_digest拼接,对应nebula中的e_fid',
`branch_company_name_digest` char(32) DEFAULT NULL COMMENT '分支机构唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_branch的数据中间表';
涉及到的数据源
utn_ic.company_branch
company_branch | nebula_edge_branch | 备注 |
---|---|---|
branch_name_digest | branch_company_name_digest | |
company_name_digest | company_name_digest | |
n_company_status | is_history | (case n_company_status when '正常' then 0 else 1 end) as is_history |
branch_name_digest | e_fid | e_fid='f'+branch_name_digest |
company_name_digest | s_fid | s_fid='f'+company_name_digest |