nebula增量例行方案
提前准备好数据,建立中间表,mysql和nebula表对表更新。
tag_firm
逻辑
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula
nebula结构
企业实体的结合,类型为tag,用fid唯一标识vid(f-company_name_digest),属性包含营业执照信息,表名tag_firm
CREATE TAG `tag_firm` ( 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id(工商digest)", 
  `company_name` string NULL COMMENT "企业姓名", 
  `credit_no` string NULL COMMENT "统一信用代码", 
  `company_code` string NULL COMMENT "企业注册号", 
  `org_code` string NULL COMMENT "组织机构代码", 
  `tax_code` string NULL COMMENT "纳税人识别号", 
  `capital` string NULL COMMENT "注册资本", 
  `real_capital` string NULL COMMENT "实缴资本", 
  `establish_date` datetime NULL COMMENT "成立日期", 
  `issue_date` datetime NULL COMMENT "核准日期", 
  `revoke_date` datetime NULL COMMENT "注销或吊销日期", 
  `company_status` string NULL COMMENT "登记状态", 
  `company_type` string NULL COMMENT "企业类型",
  `company_major_type` string NULL COMMENT "归类后的企业类型",
  `n_company_status` string NULL COMMENT "登记状态(格式化)", 
  `operation_startdate` string NULL COMMENT "营业起始时间", 
  `operation_enddate` string NULL COMMENT "营业终止时间", 
  `authority` string NULL COMMENT "登记机关", 
  `company_address` string NULL COMMENT "注册地址", 
  `company_industry` string NULL COMMENT "所属行业", 
  `province_short` string NULL COMMENT "省份简称", 
  `city` string NULL COMMENT "城市", 
  `district` string NULL COMMENT "地区", 
  `insurance_amount` int NULL COMMENT "参保人数",
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "企业实体tag";
中间表表结构
CREATE TABLE `nebula_tag_firm` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest的拼接,对应nebula中的vid,是否入nebula标志,为null时表示不入nebula',
  `company_name_digest` char(32) DEFAULT NULL COMMENT 'company唯一键',
  `company_name` varchar(255) DEFAULT NULL COMMENT ' 公司名称',
  `credit_no` varchar(50) DEFAULT NULL COMMENT '统一社会信用码',
  `company_code` varchar(50) DEFAULT NULL COMMENT '公司注册码',
  `org_code` varchar(50) DEFAULT NULL COMMENT '组织机构代码',
  `tax_code` varchar(50) DEFAULT NULL COMMENT '纳税人识别号',
  `n_company_status` varchar(16) DEFAULT NULL COMMENT '登记状态(格式化)',
  `capital` varchar(255) DEFAULT NULL COMMENT '注册资本',
  `real_capital` varchar(255) DEFAULT NULL COMMENT '实缴资本',
  `establish_date` datetime DEFAULT NULL COMMENT '成立日期',
  `issue_date` datetime DEFAULT NULL COMMENT '核准日期',
  `revoke_date` datetime DEFAULT NULL COMMENT '注销或吊销日期',
  `company_status` varchar(50) DEFAULT NULL COMMENT '登记状态',
  `company_type` varchar(255) DEFAULT NULL COMMENT '企业类型',
  `operation_startdate` varchar(50) DEFAULT NULL COMMENT '营业期限起始时间',
  `operation_enddate` varchar(50) DEFAULT NULL COMMENT '营业期限终止时间',
  `company_address` varchar(512) DEFAULT NULL COMMENT '公司注册地址',
  `authority` varchar(255) DEFAULT NULL COMMENT '登记机关',
  `company_industry` varchar(255) DEFAULT NULL COMMENT '所属行业',
  `province_short` varchar(32) DEFAULT NULL COMMENT '省份简称',
  `city` varchar(32) DEFAULT NULL COMMENT '地区',
  `district` varchar(32) DEFAULT NULL COMMENT '城市',
  `insurance_amount` int DEFAULT NULL COMMENT '参保人数',
  `is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`),
  KEY `idx_company_name` (`company_name`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_firm的数据中间表';
涉及到的数据源
tb_company
先存量入表,后续通过binlog更新
| tb_company | nebula_tag_company | 备注 | 
| company_name_digest | company_name_digest |  | 
| company_name_digest | fid | fid='f'+company_name_digest | 
utn_ic.ic
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.ic and deleted=0
| utn_ic.ic | nebula_tag_company | 备注 | 
| company_name_digest | company_name_digest |  | 
| company_name | company_name |  | 
| company_code | company_code |  | 
| credit_no | credit_no |  | 
| org_code | org_code |  | 
| tax_code | tax_code |  | 
| establish_date | establish_date |  | 
| company_status | company_status |  | 
| company_type | company_type |  | 
| authority | authority |  | 
| issue_date | issue_date |  | 
| operation_startdate | operation_startdate |  | 
| operation_enddate | operation_enddate |  | 
| capital | capital |  | 
| real_capital | real_capital |  | 
| company_address | company_address |  | 
| cancel_date | revoke_date | 两个字段值合一为revoke_date | 
| revoke_date | revoke_date | 两个字段值合一为revoke_date | 
| n_company_status | n_company_status |  | 
| province_short | province_short |  | 
| city | city |  | 
| district | district |  | 
| company_major_type | company_major_type |  | 
| industries | company_industry | 取最高一级行业划分 | 
| insurance_amount | insurance_amount |  | 
tag_person
逻辑
从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。
nebula结构
人员的集合,类型为tag,用pid唯一标识vid(p-ppid),表名 tag_person
CREATE TAG `tag_person` ( 
  `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", 
  `person_name` string NULL COMMENT "人员姓名", 
  `ac_num` int64 NULL COMMENT "关联企业数", 
  `ac_legalperson_num` int64 NULL COMMENT "担任法定代表人数", 
  `ac_partner_num` int64 NULL COMMENT "对外投资数", 
  `ac_employee_num` int64 NULL COMMENT "任职数", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "人员实体tag";
中间表表结构
CREATE TABLE `nebula_tag_person` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `pid` char(33) NOT NULL COMMENT '=p和ppid的拼接,对应nebula中的vid',
  `ppid` char(32) NOT NULL COMMENT '人员唯一id',
  `person_name` varchar(50) DEFAULT NULL COMMENT '人员姓名',
  `ac_legalperson_num` int(9) DEFAULT NULL COMMENT '担任法定代表人数',
  `ac_num` int(9) DEFAULT NULL COMMENT '关联企业数',
  `ac_partner_num` int(9) DEFAULT NULL COMMENT '对外投资数',
  `ac_employee_num` int(9) DEFAULT NULL COMMENT '任职数',
  `is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_ppid` (`ppid`),
  KEY `idx_name` (`person_name`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_person的数据中间表';
涉及到的数据源
tb_person
先存量入表,后续通过binlog更新
| tb_person | nebula_tag_person | 备注 | 
| ppid | ppid、pid | pid='p'+ppid | 
| person_name | person_name |  | 
| ac_num | ac_num |  | 
| update_time | update_time |  | 
tb_company_employee
先存量入表,后续通过binlog更新is_history=0
| tb_company_employee | nebula_tag_person | 备注 | 
| ppid | ppid |  | 
| pcid | ac_employee_num | count(pcid) as ac_employee_num,group by ppid | 
tb_company_partner
先存量入表,后续通过binlog更新is_history=0
| tb_company_partner | nebula_tag_person | 备注 | 
| partner_ppid | ppid |  | 
| pcid | ac_partner_num | count(pcid) as ac_partner_num,group by partner_ppid | 
tb_company_legalperson
先存量入表,后续通过binlog更新is_history=0
| tb_company_legalperson | nebula_tag_person | 备注 | 
| lp_ppid | ppid |  | 
| pcid | ac_legalperson_num | count(pcid) as ac_legalperson_num,group by lp_ppid | 
edge_serve
逻辑
从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
企业主要人员与企业之间任职关系的集合,类型为edge,用<pid,edge_serve,(Rank),fid>唯一标识,边类型为edge_serve
CREATE EDGE `edge_serve` ( 
  `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `position` string NULL COMMENT "职务", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "任职edge";
中间表表结构
CREATE TABLE `nebula_edge_serve` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
  `fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
  `ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `employee_name` varchar(255) NOT NULL COMMENT '员工名称',
  `position` varchar(64) DEFAULT NULL COMMENT '职务',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`, `employee_name`),
  KEY `idx_position` (`position`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_serve的数据中间表';
涉及到的数据源
tb_company_enployee
先存量入表,后续通过binlog更新
| tb_company_enployee | nebula_edge_serve | 备注 | 
| ppid | ppid |  | 
| company_name_digest | company_name_digest |  | 
| employee_name | employee_name |  | 
| company_name_digest | fid | fid='f'+company_name_digest | 
| ppid | pid | pid='p'+ppid | 
utn_ic.company_employee
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee
| utn_ic.company_employee | nebula_edge_serve | 备注 | 
| company_name_digest | company_name_digest |  | 
| employee_name | employee_name |  | 
| position | position |  | 
| is_history | is_history |  | 
edge_invest_h
逻辑
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
自然人作为企业股东,与企业间的参股关系的集合,类型为edge,用<ppid,edge_invest_h,(Rank),fid>唯一标识,边类型为edge_invest_h
CREATE EDGE `edge_invest_h` ( 
  `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `stock_proportion` string NULL COMMENT "持股比例", 
  `stock_capital` string NULL COMMENT "应缴金额", 
  `stock_realcapital` string NULL COMMENT "实缴金额", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "人员参股edge";
中间表表结构
CREATE TABLE `nebula_edge_invest_h` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
  `fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
  `ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
  `partner_name` varchar(255) NOT NULL COMMENT '股东名称',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
  `stock_proportion` double DEFAULT NULL COMMENT '持股比例',
  `stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_id` (`company_name_digest`, `partner_name`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_h的数据中间表';
涉及到的数据源
tb_company_partner
先存量入表,后续通过binlog更新
| tb_company_partner | nebula_edge_invest_h | 备注 | 
| company_name_digest | company_name_digest |  | 
| partner_ppid | ppid |  | 
| company_name_digest | fid | fid='f'+company_name_digest | 
| partner_ppid | pid | pid='p'+ppid | 
| partner_name | partner_name |  | 
utn_ic.company_partner_new
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 1
| company_partner | nebula_edge_invest_h | 备注 | 
| company_name_digest | company_name_digest |  | 
| stock_name | partner_name |  | 
| stock_capital | stock_capital |  | 
| stock_proportion | stock_proportion |  | 
| stock_realcapital | stock_realcapital |  | 
| is_history | is_history |  | 
edge_invest_c
逻辑
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
企业作为企业股东,与企业间的参股关系的集合,类型为edge,用<s_fid,invest_c,(Rank),e_fid>唯一标识,表名为edge_invest_c
CREATE EDGE `edge_invest_c` ( 
  `partner_company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `stock_proportion` string NULL COMMENT "持股比例", 
  `stock_capital` string NULL COMMENT "应缴金额", 
  `stock_realcapital` string NULL COMMENT "实缴金额", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "公司参股edge";
中间表表结构
CREATE TABLE `nebula_edge_invest_c` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `s_fid` varchar(50) DEFAULT NULL COMMENT '=f和partner_company_name_digest拼接,对应nebula中的s_fid',
  `e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
  `partner_company_name_digest` char(32) DEFAULT NULL COMMENT '股东企业唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
  `stock_proportion` double DEFAULT NULL COMMENT '持股比例',
  `stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`, `partner_company_name_digest`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_c的数据中间表';
涉及到的数据源
tb_company_partner
先存量入表,后续通过binlog更新
| tb_company_partner | nebula_edge_invest_h | 备注 | 
| company_name_digest | company_name_digest |  | 
| partner_company_name_digest | partner_company_name_digest |  | 
| company_name_digest | e_fid | e_fid='f'+company_name_digest | 
| partner_company_name_digest | s_fid | s_fid='f'+partner_company_name_digest | 
utn_ic.company_partner_new
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 0
| company_partner | nebula_edge_invest_h | 备注 | 
| company_name_digest | company_name_digest |  | 
| stock_name_digest | partner_company_name_digest |  | 
| stock_capital | stock_capital |  | 
| stock_proportion | stock_proportion |  | 
| stock_realcapital | stock_realcapital |  | 
| is_history | is_history |  | 
edge_own
逻辑
先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula
nebula结构
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<pid, edge_own, (Rank), fid>唯一标识,表名为edge_own
CREATE EDGE `edge_own` ( 
  `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge";
中间表表结构
CREATE TABLE `nebula_edge_own` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `pid` varchar(50) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
  `fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
  `ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_id` (`company_name_digest`, `ppid`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_own的数据中间表';
涉及到的数据源
tb_company_legalperson
通过binlog更新,lp_ppid is not null and company_name_digest is not null
| tb_company_legalperson | nebula_edge_own | 备注 | 
| lp_ppid | ppid |  | 
| company_name_digest | company_name_digest |  | 
| is_history | is_history |  | 
| lp_ppid | pid | pid='p'+lp_ppid | 
| company_name_digest | fid | fid='f'+company_name_digest | 
edge_own_c
逻辑
先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula
nebula结构
企业作为企业法定代表人,与企业间关系的集合,类型为edge,用<s_fid,edge_own_c,(Rank),e_fid>唯一标识,表名为edge_own_c
CREATE EDGE `edge_own_c` ( 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge";
中间表表结构
CREATE TABLE `nebula_edge_own_c` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `s_fid` varchar(50) DEFAULT NULL COMMENT '=f和lp_company_name_digest拼接,对应nebula中的s_fid',
  `e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
  `lp_company_name_digest` char(32) DEFAULT NULL COMMENT '法人企业唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_ow_c的数据中间表';
涉及到的数据源
tb_company_legalperson
通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null
| tb_company_legalperson | nebula_edge_own_c | 备注 | 
| lp_company_name_digest | lp_company_name_digest |  | 
| company_name_digest | company_name_digest |  | 
| is_history | is_history |  | 
| lp_company_name_digest | s_fid | s_fid='f'+lp_company_name_digest | 
| company_name_digest | e_fid | e_fid='f'+company_name_digest | 
edge_branch
逻辑
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表,监控中间表的binlog更新nebula
nebula结构
企业与分支机构企业间关系的集合,类型为edge,用<s_fid, edge_branch, (Rank), e_fid>唯一标识,表名为edge_branch
CREATE EDGE `edge_branch` ( 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "母公司企业唯一id", 
  `branch_company_name_digest` fixed_string(32) NOT NULL COMMENT "分支企业唯一id", 
  `update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "公司分支机构edge";
中间表表结构
CREATE TABLE `nebula_edge_branch` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `s_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的s_fid',
  `e_fid` varchar(50) DEFAULT NULL COMMENT '=f和branch_company_name_digest拼接,对应nebula中的e_fid',
  `branch_company_name_digest` char(32) DEFAULT NULL COMMENT '分支机构唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_branch的数据中间表';
涉及到的数据源
utn_ic.company_branch
存量入表后,通过kafka增量入表
| company_branch | nebula_edge_branch | 备注 | 
| branch_name_digest | branch_company_name_digest |  | 
| company_name_digest | company_name_digest |  | 
| n_company_status | is_history | (case n_company_status when '正常' then 0 else 1 end) as is_history | 
| branch_name_digest | e_fid | e_fid='f'+branch_name_digest | 
| company_name_digest | s_fid | s_fid='f'+company_name_digest |