nebula增量例行方案
提前准备好数据,建立中间表,mysql和nebula表对表更新。
tag_firm
逻辑
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula
nebula结构
企业实体的结合,类型为tag,用fid唯一标识vid(f-company_name_digest),属性包含营业执照信息,表名tag_firm
CREATE TAG `tag_firm` (
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id(工商digest)",
`company_name` string NULL COMMENT "企业姓名",
`credit_no` string NULL COMMENT "统一信用代码",
`company_code` string NULL COMMENT "企业注册号",
`org_code` string NULL COMMENT "组织机构代码",
`tax_code` string NULL COMMENT "纳税人识别号",
`capital` string NULL COMMENT "注册资本",
`real_capital` string NULL COMMENT "实缴资本",
`establish_date` datetime NULL COMMENT "成立日期",
`issue_date` datetime NULL COMMENT "核准日期",
`revoke_date` datetime NULL COMMENT "注销或吊销日期",
`company_status` string NULL COMMENT "登记状态",
`company_type` string NULL COMMENT "企业类型",
`company_major_type` string NULL COMMENT "归类后的企业类型",
`n_company_status` string NULL COMMENT "登记状态(格式化)",
`operation_startdate` string NULL COMMENT "营业起始时间",
`operation_enddate` string NULL COMMENT "营业终止时间",
`authority` string NULL COMMENT "登记机关",
`company_address` string NULL COMMENT "注册地址",
`company_industry` string NULL COMMENT "所属行业",
`province_short` string NULL COMMENT "省份简称",
`city` string NULL COMMENT "城市",
`district` string NULL COMMENT "地区",
`insurance_amount` int NULL COMMENT "参保人数",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "企业实体tag";
中间表表结构
CREATE TABLE `nebula_tag_firm` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest的拼接,对应nebula中的vid,是否入nebula标志,为null时表示不入nebula',
`company_name_digest` char(32) DEFAULT NULL COMMENT 'company唯一键',
`company_name` varchar(255) DEFAULT NULL COMMENT ' 公司名称',
`credit_no` varchar(50) DEFAULT NULL COMMENT '统一社会信用码',
`company_code` varchar(50) DEFAULT NULL COMMENT '公司注册码',
`org_code` varchar(50) DEFAULT NULL COMMENT '组织机构代码',
`tax_code` varchar(50) DEFAULT NULL COMMENT '纳税人识别号',
`n_company_status` varchar(16) DEFAULT NULL COMMENT '登记状态(格式化)',
`capital` varchar(255) DEFAULT NULL COMMENT '注册资本',
`real_capital` varchar(255) DEFAULT NULL COMMENT '实缴资本',
`establish_date` datetime DEFAULT NULL COMMENT '成立日期',
`issue_date` datetime DEFAULT NULL COMMENT '核准日期',
`revoke_date` datetime DEFAULT NULL COMMENT '注销或吊销日期',
`company_status` varchar(50) DEFAULT NULL COMMENT '登记状态',
`company_type` varchar(255) DEFAULT NULL COMMENT '企业类型',
`operation_startdate` varchar(50) DEFAULT NULL COMMENT '营业期限起始时间',
`operation_enddate` varchar(50) DEFAULT NULL COMMENT '营业期限终止时间',
`company_address` varchar(512) DEFAULT NULL COMMENT '公司注册地址',
`authority` varchar(255) DEFAULT NULL COMMENT '登记机关',
`company_industry` varchar(255) DEFAULT NULL COMMENT '所属行业',
`province_short` varchar(32) DEFAULT NULL COMMENT '省份简称',
`city` varchar(32) DEFAULT NULL COMMENT '地区',
`district` varchar(32) DEFAULT NULL COMMENT '城市',
`insurance_amount` int DEFAULT NULL COMMENT '参保人数',
`is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`),
KEY `idx_company_name` (`company_name`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_firm的数据中间表';
涉及到的数据源
tb_company
通过binlog更新
tb_company |
nebula_tag_company |
备注 |
company_name_digest |
company_name_digest |
|
company_name_digest |
fid |
fid='f'+company_name_digest |
utn_ic.ic
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.ic and deleted=0
utn_ic.ic |
nebula_tag_company |
备注 |
company_name_digest |
company_name_digest |
|
company_name |
company_name |
|
company_code |
company_code |
|
credit_no |
credit_no |
|
org_code |
org_code |
|
tax_code |
tax_code |
|
establish_date |
establish_date |
|
company_status |
company_status |
|
company_type |
company_type |
|
authority |
authority |
|
issue_date |
issue_date |
|
operation_startdate |
operation_startdate |
|
operation_enddate |
operation_enddate |
|
capital |
capital |
|
real_capital |
real_capital |
|
company_address |
company_address |
|
cancel_date |
revoke_date |
两个字段值合一为revoke_date |
revoke_date |
revoke_date |
两个字段值合一为revoke_date |
n_company_status |
n_company_status |
|
province_short |
province_short |
|
city |
city |
|
district |
district |
|
company_major_type |
company_major_type |
|
industries |
company_industry |
取最高一级行业划分 |
insurance_amount |
insurance_amount |
|
tag_person
逻辑
从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。
nebula结构
人员的集合,类型为tag,用pid唯一标识vid(p-ppid),表名 tag_person
CREATE TAG `tag_person` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`person_name` string NULL COMMENT "人员姓名",
`ac_num` int64 NULL COMMENT "关联企业数",
`ac_legalperson_num` int64 NULL COMMENT "担任法定代表人数",
`ac_partner_num` int64 NULL COMMENT "对外投资数",
`ac_employee_num` int64 NULL COMMENT "任职数",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "人员实体tag";
中间表表结构
CREATE TABLE `nebula_tag_person` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`pid` char(33) NOT NULL COMMENT '=p和ppid的拼接,对应nebula中的vid',
`ppid` char(32) NOT NULL COMMENT '人员唯一id',
`person_name` varchar(50) DEFAULT NULL COMMENT '人员姓名',
`ac_legalperson_num` int(9) DEFAULT NULL COMMENT '担任法定代表人数',
`ac_num` int(9) DEFAULT NULL COMMENT '关联企业数',
`ac_partner_num` int(9) DEFAULT NULL COMMENT '对外投资数',
`ac_employee_num` int(9) DEFAULT NULL COMMENT '任职数',
`is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_ppid` (`ppid`),
KEY `idx_name` (`person_name`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_person的数据中间表';
涉及到的数据源
tb_person
通过binlog更新
tb_person |
nebula_tag_person |
备注 |
ppid |
ppid、pid |
pid='p'+ppid |
person_name |
person_name |
|
ac_num |
ac_num |
|
update_time |
update_time |
|
tb_company_employee
通过binlog更新is_history=0
tb_company_employee |
nebula_tag_person |
备注 |
ppid |
ppid |
|
pcid |
ac_employee_num |
count(pcid) as ac_employee_num,group by ppid |
tb_company_partner
通过binlog更新is_history=0
tb_company_partner |
nebula_tag_person |
备注 |
partner_ppid |
ppid |
|
pcid |
ac_partner_num |
count(pcid) as ac_partner_num,group by partner_ppid |
tb_company_legalperson
通过binlog更新is_history=0
tb_company_legalperson |
nebula_tag_person |
备注 |
lp_ppid |
ppid |
|
pcid |
ac_legalperson_num |
count(pcid) as ac_legalperson_num,group by lp_ppid |
edge_serve
逻辑
从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
企业主要人员与企业之间任职关系的集合,类型为edge,用<pid,edge_serve,(Rank),fid>唯一标识,边类型为edge_serve
CREATE EDGE `edge_serve` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`position` string NULL COMMENT "职务",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "任职edge";
中间表表结构
CREATE TABLE `nebula_edge_serve` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`employee_name` varchar(255) NOT NULL COMMENT '员工名称',
`position` varchar(64) DEFAULT NULL COMMENT '职务',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `employee_name`),
KEY `idx_position` (`position`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_serve的数据中间表';
涉及到的数据源
tb_company_enployee
通过binlog更新
tb_company_enployee |
nebula_edge_serve |
备注 |
ppid |
ppid |
|
company_name_digest |
company_name_digest |
|
employee_name |
employee_name |
|
company_name_digest |
fid |
fid='f'+company_name_digest |
ppid |
pid |
pid='p'+ppid |
utn_ic.company_employee
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee
utn_ic.company_employee |
nebula_edge_serve |
备注 |
company_name_digest |
company_name_digest |
|
employee_name |
employee_name |
|
position |
position |
|
is_history |
is_history |
|
edge_invest_h
逻辑
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
自然人作为企业股东,与企业间的参股关系的集合,类型为edge,用<ppid,edge_invest_h,(Rank),fid>唯一标识,边类型为edge_invest_h
CREATE EDGE `edge_invest_h` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`stock_proportion` string NULL COMMENT "持股比例",
`stock_capital` string NULL COMMENT "应缴金额",
`stock_realcapital` string NULL COMMENT "实缴金额",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "人员参股edge";
中间表表结构
CREATE TABLE `nebula_edge_invest_h` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
`fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
`ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
`partner_name` varchar(255) NOT NULL COMMENT '股东名称',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
`stock_proportion` double DEFAULT NULL COMMENT '持股比例',
`stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_id` (`company_name_digest`, `partner_name`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_h的数据中间表';
涉及到的数据源
tb_company_partner
通过binlog更新
tb_company_partner |
nebula_edge_invest_h |
备注 |
company_name_digest |
company_name_digest |
|
partner_ppid |
ppid |
|
company_name_digest |
fid |
fid='f'+company_name_digest |
partner_ppid |
pid |
pid='p'+ppid |
partner_name |
partner_name |
|
utn_ic.company_partner_new
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 1
company_partner |
nebula_edge_invest_h |
备注 |
company_name_digest |
company_name_digest |
|
stock_name |
partner_name |
|
stock_capital |
stock_capital |
|
stock_proportion |
stock_proportion |
|
stock_realcapital |
stock_realcapital |
|
is_history |
is_history |
|
edge_invest_c
逻辑
从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula
nebula结构
企业作为企业股东,与企业间的参股关系的集合,类型为edge,用<s_fid,invest_c,(Rank),e_fid>唯一标识,表名为edge_invest_c
CREATE EDGE `edge_invest_c` (
`partner_company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`stock_proportion` string NULL COMMENT "持股比例",
`stock_capital` string NULL COMMENT "应缴金额",
`stock_realcapital` string NULL COMMENT "实缴金额",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "公司参股edge";
中间表表结构
CREATE TABLE `nebula_edge_invest_c` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和partner_company_name_digest拼接,对应nebula中的s_fid',
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
`partner_company_name_digest` char(32) DEFAULT NULL COMMENT '股东企业唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
`stock_proportion` double DEFAULT NULL COMMENT '持股比例',
`stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `partner_company_name_digest`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_c的数据中间表';
涉及到的数据源
tb_company_partner
通过binlog更新
tb_company_partner |
nebula_edge_invest_h |
备注 |
company_name_digest |
company_name_digest |
|
partner_company_name_digest |
partner_company_name_digest |
|
company_name_digest |
e_fid |
e_fid='f'+company_name_digest |
partner_company_name_digest |
s_fid |
s_fid='f'+partner_company_name_digest |
utn_ic.company_partner_new
通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 0
company_partner |
nebula_edge_invest_h |
备注 |
company_name_digest |
company_name_digest |
|
stock_name_digest |
partner_company_name_digest |
|
stock_capital |
stock_capital |
|
stock_proportion |
stock_proportion |
|
stock_realcapital |
stock_realcapital |
|
is_history |
is_history |
|
edge_own
逻辑
直接通过监控融合库tb_company_legalperson表的binlog更新
nebula结构
自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<ppid, edge_own, (Rank), fid>唯一标识,表名为edge_own
CREATE EDGE `edge_own` (
`ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id",
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge";
涉及到的数据源
tb_company_legalperson
通过binlog更新,lp_ppid is not null and company_name_digest is not null
tb_company_legalperson |
备注 |
lp_ppid |
|
company_name_digest |
|
is_history |
|
update_time |
|
edge_own_c
逻辑
直接通过监控融合库tb_company_legalperson表的binlog更新
nebula结构
CREATE EDGE `edge_own_c` (
`company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id",
`lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge";
涉及到的数据源
tb_company_legalperson
通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null
tb_company_legalperson |
备注 |
lp_company_name_digest |
|
company_name_digest |
|
is_history |
|
update_time |
|
edge_branch
逻辑
从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表
nebula结构
企业与分支机构企业间关系的集合,类型为edge,用<s_fid, edge_branch, (Rank), e_fid>唯一标识,表名为edge_branch
CREATE EDGE `edge_branch` (
`company_name_digest` fixed_string(32) NOT NULL COMMENT "母公司企业唯一id",
`branch_company_name_digest` fixed_string(32) NOT NULL COMMENT "分支企业唯一id",
`update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "公司分支机构edge";
中间表表结构
CREATE TABLE `nebula_edge_branch` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`s_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的s_fid',
`e_fid` varchar(50) DEFAULT NULL COMMENT '=f和branch_company_name_digest拼接,对应nebula中的e_fid',
`branch_company_name_digest` char(32) DEFAULT NULL COMMENT '分支机构唯一id',
`company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
`is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_branch的数据中间表';
涉及到的数据源
utn_ic.company_branch
company_branch |
nebula_edge_branch |
备注 |
branch_name_digest |
branch_company_name_digest |
|
company_name_digest |
company_name_digest |
|
n_company_status |
is_history |
(case n_company_status when '正常' then 0 else 1 end) as is_history |
branch_name_digest |
e_fid |
e_fid='f'+branch_name_digest |
company_name_digest |
s_fid |
s_fid='f'+company_name_digest |