Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Data_stream
    • Equity_penetration
  • update_nebula

Last edited by lizj Jan 07, 2022
Page history

update_nebula

nebula增量例行方案

提前准备好数据,建立中间表,mysql和nebula表对表更新。

tag_firm

逻辑

从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的fid字段不为null时,例行同步的程序中才将对应记录入nebula

nebula结构

企业实体的结合,类型为tag,用fid唯一标识vid(f-company_name_digest),属性包含营业执照信息,表名tag_firm

CREATE TAG `tag_firm` ( 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id(工商digest)", 
  `company_name` string NULL COMMENT "企业姓名", 
  `credit_no` string NULL COMMENT "统一信用代码", 
  `company_code` string NULL COMMENT "企业注册号", 
  `org_code` string NULL COMMENT "组织机构代码", 
  `tax_code` string NULL COMMENT "纳税人识别号", 
  `capital` string NULL COMMENT "注册资本", 
  `real_capital` string NULL COMMENT "实缴资本", 
  `establish_date` datetime NULL COMMENT "成立日期", 
  `issue_date` datetime NULL COMMENT "核准日期", 
  `revoke_date` datetime NULL COMMENT "注销或吊销日期", 
  `company_status` string NULL COMMENT "登记状态", 
  `company_type` string NULL COMMENT "企业类型",
  `company_major_type` string NULL COMMENT "归类后的企业类型",
  `n_company_status` string NULL COMMENT "登记状态(格式化)", 
  `operation_startdate` string NULL COMMENT "营业起始时间", 
  `operation_enddate` string NULL COMMENT "营业终止时间", 
  `authority` string NULL COMMENT "登记机关", 
  `company_address` string NULL COMMENT "注册地址", 
  `company_industry` string NULL COMMENT "所属行业", 
  `province_short` string NULL COMMENT "省份简称", 
  `city` string NULL COMMENT "城市", 
  `district` string NULL COMMENT "地区", 
  `insurance_amount` int NULL COMMENT "参保人数",
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "企业实体tag";

中间表表结构

CREATE TABLE `nebula_tag_firm` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest的拼接,对应nebula中的vid,是否入nebula标志,为null时表示不入nebula',
  `company_name_digest` char(32) DEFAULT NULL COMMENT 'company唯一键',
  `company_name` varchar(255) DEFAULT NULL COMMENT ' 公司名称',
  `credit_no` varchar(50) DEFAULT NULL COMMENT '统一社会信用码',
  `company_code` varchar(50) DEFAULT NULL COMMENT '公司注册码',
  `org_code` varchar(50) DEFAULT NULL COMMENT '组织机构代码',
  `tax_code` varchar(50) DEFAULT NULL COMMENT '纳税人识别号',
  `n_company_status` varchar(16) DEFAULT NULL COMMENT '登记状态(格式化)',
  `capital` varchar(255) DEFAULT NULL COMMENT '注册资本',
  `real_capital` varchar(255) DEFAULT NULL COMMENT '实缴资本',
  `establish_date` datetime DEFAULT NULL COMMENT '成立日期',
  `issue_date` datetime DEFAULT NULL COMMENT '核准日期',
  `revoke_date` datetime DEFAULT NULL COMMENT '注销或吊销日期',
  `company_status` varchar(50) DEFAULT NULL COMMENT '登记状态',
  `company_type` varchar(255) DEFAULT NULL COMMENT '企业类型',
  `operation_startdate` varchar(50) DEFAULT NULL COMMENT '营业期限起始时间',
  `operation_enddate` varchar(50) DEFAULT NULL COMMENT '营业期限终止时间',
  `company_address` varchar(512) DEFAULT NULL COMMENT '公司注册地址',
  `authority` varchar(255) DEFAULT NULL COMMENT '登记机关',
  `company_industry` varchar(255) DEFAULT NULL COMMENT '所属行业',
  `province_short` varchar(32) DEFAULT NULL COMMENT '省份简称',
  `city` varchar(32) DEFAULT NULL COMMENT '地区',
  `district` varchar(32) DEFAULT NULL COMMENT '城市',
  `insurance_amount` int DEFAULT NULL COMMENT '参保人数',
  `is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`),
  KEY `idx_company_name` (`company_name`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_firm的数据中间表';

涉及到的数据源

tb_company

先存量入表,后续通过binlog更新
tb_company nebula_tag_company 备注
company_name_digest company_name_digest
company_name_digest fid fid='f'+company_name_digest

utn_ic.ic

通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.ic and deleted=0
utn_ic.ic nebula_tag_company 备注
company_name_digest company_name_digest
company_name company_name
company_code company_code
credit_no credit_no
org_code org_code
tax_code tax_code
establish_date establish_date
company_status company_status
company_type company_type
authority authority
issue_date issue_date
operation_startdate operation_startdate
operation_enddate operation_enddate
capital capital
real_capital real_capital
company_address company_address
cancel_date revoke_date 两个字段值合一为revoke_date
revoke_date revoke_date 两个字段值合一为revoke_date
n_company_status n_company_status
province_short province_short
city city
district district
company_major_type company_major_type
industries company_industry 取最高一级行业划分
insurance_amount insurance_amount

tag_person

逻辑

从融合库中提前将数据准备至nebula中间表,所需字段通过binlog增量入中间表;将ppid作为唯一键,由于其他表中有的ppid表tb_person里一定有,所以用ac_partner_num、ac_employee_num、ac_legalperson_num三个字段都不为null当做是否更新nebula的标志。

nebula结构

人员的集合,类型为tag,用pid唯一标识vid(p-ppid),表名 tag_person

CREATE TAG `tag_person` ( 
  `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", 
  `person_name` string NULL COMMENT "人员姓名", 
  `ac_num` int64 NULL COMMENT "关联企业数", 
  `ac_legalperson_num` int64 NULL COMMENT "担任法定代表人数", 
  `ac_partner_num` int64 NULL COMMENT "对外投资数", 
  `ac_employee_num` int64 NULL COMMENT "任职数", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "人员实体tag";

中间表表结构

CREATE TABLE `nebula_tag_person` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `pid` char(33) NOT NULL COMMENT '=p和ppid的拼接,对应nebula中的vid',
  `ppid` char(32) NOT NULL COMMENT '人员唯一id',
  `person_name` varchar(50) DEFAULT NULL COMMENT '人员姓名',
  `ac_legalperson_num` int(9) DEFAULT NULL COMMENT '担任法定代表人数',
  `ac_num` int(9) DEFAULT NULL COMMENT '关联企业数',
  `ac_partner_num` int(9) DEFAULT NULL COMMENT '对外投资数',
  `ac_employee_num` int(9) DEFAULT NULL COMMENT '任职数',
  `is_deleted` smallint(1) DEFAULT 0 COMMENT '0:保留,1:删除',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_ppid` (`ppid`),
  KEY `idx_name` (`person_name`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的tag_person的数据中间表';

涉及到的数据源

tb_person

先存量入表,后续通过binlog更新
tb_person nebula_tag_person 备注
ppid ppid、pid pid='p'+ppid
person_name person_name
ac_num ac_num
update_time update_time

tb_company_employee

先存量入表,后续通过binlog更新is_history=0
tb_company_employee nebula_tag_person 备注
ppid ppid
pcid ac_employee_num count(pcid) as ac_employee_num,group by ppid

tb_company_partner

先存量入表,后续通过binlog更新is_history=0
tb_company_partner nebula_tag_person 备注
partner_ppid ppid
pcid ac_partner_num count(pcid) as ac_partner_num,group by partner_ppid

tb_company_legalperson

先存量入表,后续通过binlog更新is_history=0
tb_company_legalperson nebula_tag_person 备注
lp_ppid ppid
pcid ac_legalperson_num count(pcid) as ac_legalperson_num,group by lp_ppid

edge_serve

逻辑

从mongo和融合库中提前将数据准备至nebula中间表,mongo所需字段存量入表,后续通过kafka增量入中间表;将company_name_digest和employee_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula

nebula结构

企业主要人员与企业之间任职关系的集合,类型为edge,用<pid,edge_serve,(Rank),fid>唯一标识,边类型为edge_serve
CREATE EDGE `edge_serve` ( 
  `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `position` string NULL COMMENT "职务", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "任职edge";

中间表表结构

CREATE TABLE `nebula_edge_serve` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
  `fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
  `ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `employee_name` varchar(255) NOT NULL COMMENT '员工名称',
  `position` varchar(64) DEFAULT NULL COMMENT '职务',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`, `employee_name`),
  KEY `idx_position` (`position`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_serve的数据中间表';

涉及到的数据源

tb_company_enployee

先存量入表,后续通过binlog更新
tb_company_enployee nebula_edge_serve 备注
ppid ppid
company_name_digest company_name_digest
employee_name employee_name
company_name_digest fid fid='f'+company_name_digest
ppid pid pid='p'+ppid

utn_ic.company_employee

通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee
utn_ic.company_employee nebula_edge_serve 备注
company_name_digest company_name_digest
employee_name employee_name
position position
is_history is_history

edge_invest_h

逻辑

从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_name作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的pid和fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula

nebula结构

自然人作为企业股东,与企业间的参股关系的集合,类型为edge,用<ppid,edge_invest_h,(Rank),fid>唯一标识,边类型为edge_invest_h
CREATE EDGE `edge_invest_h` ( 
  `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `stock_proportion` string NULL COMMENT "持股比例", 
  `stock_capital` string NULL COMMENT "应缴金额", 
  `stock_realcapital` string NULL COMMENT "实缴金额", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "人员参股edge";

中间表表结构

CREATE TABLE `nebula_edge_invest_h` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `pid` char(33) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
  `fid` char(33) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
  `ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
  `partner_name` varchar(255) NOT NULL COMMENT '股东名称',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
  `stock_proportion` double DEFAULT NULL COMMENT '持股比例',
  `stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_id` (`company_name_digest`, `partner_name`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_h的数据中间表';

涉及到的数据源

tb_company_partner

先存量入表,后续通过binlog更新
tb_company_partner nebula_edge_invest_h 备注
company_name_digest company_name_digest
partner_ppid ppid
company_name_digest fid fid='f'+company_name_digest
partner_ppid pid pid='p'+ppid
partner_name partner_name

utn_ic.company_partner_new

通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 1
company_partner nebula_edge_invest_h 备注
company_name_digest company_name_digest
stock_name partner_name
stock_capital stock_capital
stock_proportion stock_proportion
stock_realcapital stock_realcapital
is_history is_history

edge_invest_c

逻辑

从mongo和融合库中提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表;将company_name_digest和partner_company_name_digest作为唯一键,融合库数据补充入表,只入关联关系字段;后续更新nebula时,采用监控binlog的方式,当所更新记录的s_fid和e_fid字段都不为null时,例行同步的程序中才将对应记录更新入nebula

nebula结构

企业作为企业股东,与企业间的参股关系的集合,类型为edge,用<s_fid,invest_c,(Rank),e_fid>唯一标识,表名为edge_invest_c
CREATE EDGE `edge_invest_c` ( 
  `partner_company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `stock_proportion` string NULL COMMENT "持股比例", 
  `stock_capital` string NULL COMMENT "应缴金额", 
  `stock_realcapital` string NULL COMMENT "实缴金额", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "公司参股edge";

中间表表结构

CREATE TABLE `nebula_edge_invest_c` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `s_fid` varchar(50) DEFAULT NULL COMMENT '=f和partner_company_name_digest拼接,对应nebula中的s_fid',
  `e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
  `partner_company_name_digest` char(32) DEFAULT NULL COMMENT '股东企业唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `stock_capital` varchar(64) DEFAULT NULL COMMENT '应缴金额',
  `stock_proportion` double DEFAULT NULL COMMENT '持股比例',
  `stock_realcapital` varchar(64) DEFAULT NULL COMMENT '实缴金额',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`, `partner_company_name_digest`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_invest_c的数据中间表';

涉及到的数据源

tb_company_partner

先存量入表,后续通过binlog更新
tb_company_partner nebula_edge_invest_h 备注
company_name_digest company_name_digest
partner_company_name_digest partner_company_name_digest
company_name_digest e_fid e_fid='f'+company_name_digest
partner_company_name_digest s_fid s_fid='f'+partner_company_name_digest

utn_ic.company_partner_new

通过kafka的topic:sync-mongo-to-hudi更新,取ns=utn_ic.company_employee and deleted = 0 and is_personal = 0
company_partner nebula_edge_invest_h 备注
company_name_digest company_name_digest
stock_name_digest partner_company_name_digest
stock_capital stock_capital
stock_proportion stock_proportion
stock_realcapital stock_realcapital
is_history is_history

edge_own

逻辑

先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula

nebula结构

自然人作为企业法定代表人,与企业间关系的集合,类型为edge,用<pid, edge_own, (Rank), fid>唯一标识,表名为edge_own
CREATE EDGE `edge_own` ( 
  `ppid` fixed_string(32) NOT NULL COMMENT "人员唯一id", 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "人员法定代表人edge";

中间表表结构

CREATE TABLE `nebula_edge_own` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `pid` varchar(50) DEFAULT NULL COMMENT '=p和ppid拼接,对应nebula中的pid',
  `fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的fid',
  `ppid` char(32) DEFAULT NULL COMMENT '人员唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_id` (`company_name_digest`, `ppid`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_own的数据中间表';

涉及到的数据源

tb_company_legalperson

通过binlog更新,lp_ppid is not null and company_name_digest is not null
tb_company_legalperson nebula_edge_own 备注
lp_ppid ppid
company_name_digest company_name_digest
is_history is_history
lp_ppid pid pid='p'+lp_ppid
company_name_digest fid fid='f'+company_name_digest

edge_own_c

逻辑

先存量入中间表,然后通过监控融合库tb_company_legalperson表的binlog更新中间表,后续通过中间表binlog更新nebula

nebula结构

企业作为企业法定代表人,与企业间关系的集合,类型为edge,用<s_fid,edge_own_c,(Rank),e_fid>唯一标识,表名为edge_own_c
CREATE EDGE `edge_own_c` ( 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "企业唯一id", 
  `lp_company_name_digest` fixed_string(32) NOT NULL COMMENT "法人企业唯一id", 
  `update_time` datetime NULL COMMENT "最后更新时间" 
) ttl_duration = 0, ttl_col = "", comment = "公司法定代表人edge";

中间表表结构

CREATE TABLE `nebula_edge_own_c` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `s_fid` varchar(50) DEFAULT NULL COMMENT '=f和lp_company_name_digest拼接,对应nebula中的s_fid',
  `e_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的e_fid',
  `lp_company_name_digest` char(32) DEFAULT NULL COMMENT '法人企业唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_ow_c的数据中间表';

涉及到的数据源

tb_company_legalperson

通过binlog更新,lp_company_name_digest is not null and company_name_digest is not null
tb_company_legalperson nebula_edge_own_c 备注
lp_company_name_digest lp_company_name_digest
company_name_digest company_name_digest
is_history is_history
lp_company_name_digest s_fid s_fid='f'+lp_company_name_digest
company_name_digest e_fid e_fid='f'+company_name_digest

edge_branch

逻辑

从mongo提前将数据准备至nebula中间表,mongo保持所需字段存量入表,后续通过kafka增量入表,监控中间表的binlog更新nebula

nebula结构

企业与分支机构企业间关系的集合,类型为edge,用<s_fid, edge_branch, (Rank), e_fid>唯一标识,表名为edge_branch
CREATE EDGE `edge_branch` ( 
  `company_name_digest` fixed_string(32) NOT NULL COMMENT "母公司企业唯一id", 
  `branch_company_name_digest` fixed_string(32) NOT NULL COMMENT "分支企业唯一id", 
  `update_time` datetime NULL COMMENT "最后更新时间"
) ttl_duration = 0, ttl_col = "", comment = "公司分支机构edge";

中间表表结构

CREATE TABLE `nebula_edge_branch` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `s_fid` varchar(50) DEFAULT NULL COMMENT '=f和company_name_digest拼接,对应nebula中的s_fid',
  `e_fid` varchar(50) DEFAULT NULL COMMENT '=f和branch_company_name_digest拼接,对应nebula中的e_fid',
  `branch_company_name_digest` char(32) DEFAULT NULL COMMENT '分支机构唯一id',
  `company_name_digest` char(32) DEFAULT NULL COMMENT '企业唯一id',
  `is_history` smallint(1) DEFAULT 0 COMMENT '0:正常,1:历史',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_name_digest` (`company_name_digest`, `branch_company_name_digest`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT 'nebula的edge_branch的数据中间表';

涉及到的数据源

utn_ic.company_branch

存量入表后,通过kafka增量入表
company_branch nebula_edge_branch 备注
branch_name_digest branch_company_name_digest
company_name_digest company_name_digest
n_company_status is_history (case n_company_status when '正常' then 0 else 1 end) as is_history
branch_name_digest e_fid e_fid='f'+branch_name_digest
company_name_digest s_fid s_fid='f'+company_name_digest
Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages