MySQL通用入库模块
本模块用于一般的数据入MySql数据库的操作。目前支持以下几种操作
- insert
- update
- upsert
- delete
本模块需要根据所传入的数据类型及要求执行的操作,向预先定义的数据库中 执行相应的数据库操作。模块能够正确执行数据库操作依赖以下前提:
- 数据本身或模块配置中,指明了传入数据为何种数据类型
- 数据本身或模块配置中,指明了应该执行何种数据库操作(insert/update/upsert/delete)
- 当前模块配置指明了特定数据类型所对应的数据库及表
约定
本文档中配置或数据示例中,对于dict类型的key的名字,可能存在系统预定义的内容和用户定义的内容。 为了以示区分,所有用户自定义的内容均以<>引用。如以下内容
{
"<field1>": "value", //要入库的具休数据
"<field2>": "value",
...
"sync_condition": { //入库操作及数据类型声明
"operation": "upsert",
"data_type": "test_ic",
}
}
- field1。field2均为用户的内容。
- sync_condition, operation, data_type 为本模块定义的内容。
使用尖括号只是为了在文档中加以区分,并非实际应用中需要这样做。
配置文件的兼容模式
早期的配置文件在设计上存在一个问题: 配置中dict的key可能是用户自定义的内容。 这样做虽然可以让配置相对简洁, 但配置文件的可读性并不好。 为了能具备更好的可读性,新版本中修改了配置的形式。新的配置形式中确保配置中 dict的key都是程序所设置的保留字。
如下示例,data_type新老两个版本不同的配置方式。
#老版本的配置
# teams是用户自定义的data_type,
# 其值是teams这个data_type对应的数据库。
#
data_type:
- teams: ['tb_teams', 'tb_members']
#新版本的配置
# name, tables都是程序的配置项,含义明确
data_type:
- name: "teams"
tables: ['tb_teams', 'tb_members']
考虑到已经有大量的配置文件在生产中使用,程序中保留了对老样式的兼容。 默认情况下,程序开启兼容模式(compatible:True)。可能的话请关闭兼容模式, 因为, 所有的新功能的开发,都将只支持非兼容模式。
1. 传入的数据
传入数据可以声明当前数据的数据类型(data_type)及希望执行的操作(operation)。 前续模块可以通过在数据中加入如下内容来实现这个目的。
{
"<field1>": "value", //要入库的具休数据
"<field2>": "value",
...
"sync_condition": { //入库操作及数据类型声明
"operation":"upsert",
"data_type": "test_ic",
}
}
1.1 操作和数据类型声明 (sync_condition)
操作 (operation)
operation | 说明 |
---|---|
insert | 插入操作 |
update | 更新操作 |
upsert | 不存在则insert 否则 update |
delete | 删除操作 |
update/upsert/delete 操作约束
要支持这三种操作需具备以下条件:
- 数据库表上有主键和唯一键
- 传入的数据也指定了主键和唯一键
例,数据库表如下
字段 | 说明 |
---|---|
id | pk |
company_name_digest | uni index |
company_name | |
company_type | |
company_status |
具体数据如下:
id | company_name_digest | company_name | company_status |
---|---|---|---|
1 | 56a1254773711761 | 钦州全安劳务有限公司 | 正常 |
2 | 5e802c8d9e473c01 | 凭安网络科技有限公司 | 正常 |
如果要将 钦州全安劳务有限公司 的状态更新为 吊销 则需要转入如下数据:
{
"<id>": 1,
"<company_status>": "吊销",
"sync_condition": {
"operation":"update",
}
}
或
{
"<company_name_digest>": "5e802c8d9e473c01",
"<company_status>": "吊销",
"sync_condition": {
"operation":"update",
}
}
因为,id和company_name_digest都能唯一确定这条数据。由此可以完成更新。
1.2 数据类型 data_type
自定义的数据类型标识。该字段内容需要与模块中定义的data_type相对应。才能完成相应数据库操作。
注意:如果在数据中没有指定sync_condition,也可通过模块配置项
default_data_type和default_operation进行全局配置。但是在一个模块配置中只能
设置一个全局默认值。因此,这种方法只适用于处理单一数据类型的时候。
2. 模块配置
模块的配置用于定义前续模块所传入的数据对应更新到哪个数据库的什么表中。 这个定义通过一个两层结构来表达:
catalog --> data_type
- catalog 定义数据库连接参数
- data_type 定义某一种要处理的数据对应数据库里的一张或多张表
配置项目 | 是否必填 | 数据类型 | 说明 |
---|---|---|---|
compatible | 选填 | 布尔 | 配置文件兼容选项 |
catalogues | 必填 | 字典数组 | |
default_data_type | 选填 | 字符串 | 当数据中没有指明数据类型(data_type)时,默认使用的数据类型 |
default_operation | 选填 | 字符串 | 当数据中没有指明操作(operation)时,默认使用的操作方式 |
compatible
布尔类型。 True 使用老版本的配置模式。 提供该选项只是为了保证已经存在的配置文件仍然可以工作。 False 以本文档当前说明的方式进行配置。新写的配置文件都应该设置为False
default_data_type和default_operation
当数据中没有指明数据类型或操作时。则默认使用这两项所设置的值。如果在数据中对data_type和 operation进行了设置,则忽略该选项
catalogues (catalog数组)
数组类型。
它的每个元素均是一个字典,我们称为catalog。 catalog的选项如下表:
配置项目 | 是否必填 | 数据类型 | 说明 |
---|---|---|---|
data_type | 必填 | 数组 | |
db_connection | 必填 | 字典 | |
table_explode | 选填 | 数组 | |
db_start_transaction | 选填 | ||
table_match | 选填 |
一个catalog中只能定义一种数据库连接参数。 通常把使用的相同连接参数的data_type配置在一个catalog中。 当然也可以根据需要放在多个catalog中。
add_return_keys
add_return_keys定义入库完成后的返回的dict结果中要带有add_return_keys中是定义的键以及键对应的值(支持str和list格式)
没有add_return_keys时:
{'sync_status': 'success', 'table_name': 'company_employment', 'primary_key': {'ID': 0}, 'affected_rows': 0}
"add_return_keys": ["company_name_digest"]
{'sync_status': 'success', 'table_name': 'company_employment', 'primary_key': {'ID': 0}, 'affected_rows': 0, 'company_name_digest': '7d49c410a18fa394ea14240de45fc564'}
add_return_change_info
add_return_change_info(True/False)默认为False,当为True时,会对比入库前后那些数据产生了变化,数据库中原有的数据记录到change_before中, 新的数据入库后,对数据库产生改变的数据记录到change_after;数据对数据库的操作会记录到change_type字段中,有3种情况,新增(include), 删除(remove),更新(change);比如表中的原有记录{'CITY': '黄山0', 'DISTRICT': '祁门县0'}, 新增数据{'CITY': '黄山', 'DISTRICT': '祁门县'},add_return_full_info与add_return_change_info这2个配置不能同时为True; 入库完成后记录的结果如下:
{
'msg': [
{
'sync_status': 'success',
'table_name': 'company_employment',
'primary_key': {'ID': 0},
'affected_rows': 1,
'change_type': 'change',
'change_time': '2020-09-15 17:55:04',
'change_before': {
'CITY': '黄山0',
'DISTRICT': '祁门县0',
'UPDATE_TIME': datetime.datetime(2020,9,15,17,54,56)
},
'change_after': {
'CITY': '黄山',
'DISTRICT': '祁门县E_TIME': datetime.datetime(2020,7,16,0,0)
},
'company_name_digest': '7d49c410a18fa394ea14240de45fc564'
}
]
}
add_return_full_info
add_return_full_info(True/False)默认为False,当为True时,会记录完整的入库前和入库后的所有数据,入库前的数据在change_before, 入库后的数据在change_after;数据对数据库的操作会记录到change_type字段中,有3种情况,新增(include),删除(remove),更新(change); add_return_full_info与add_return_change_info这2个配置不能同时为True 入库完成后记录的结果如下:
{
'msg': [
{
'sync_status': 'success',
'table_name': 'company_employment',
'primary_key': {
'ID': 0
},
'affected_rows': 1,
'change_type': 'change',
'change_time': '2020-09-1519: 03: 42',
'change_before': {
'ID': 'id',
'TITLE': 'title',
'CITY': 'city',
'DISTRICT': 'district',
'COMPANY_NAME': 'company_name',
'FROM_URL': 'from_url',
'ORI_SALARY': 'ori_salary',
'URL_PATH': 'url_Path',
'STARTDATE': 'startDate',
'ENDDATE': 'endDate',
'SOURCE': 'source',
'EDUCATION': 'education',
'EMPLOYER_NUMBER': 'employer_number',
'DESCRIPTION': 'description',
'EXPERIENCE': 'experience',
'CREATE_TIME': 'create_time',
'UPDATE_TIME': 'update_time',
'CLASS': 'class',
'DELETED': 'deleted',
'JOB_FIRST_CLASS': 'job_first_class',
'JOB_SECOND_CLASS': 'job_second_class',
'JOB_THIRD_CLASS': 'job_third_class',
'LOCATION': 'location',
'ALTERNATEFIELD1': 'alternateField1',
'ALTERNATEFIELD2': 'alternateField2',
'ALTERNATEFIELD3': 'alternateField3',
'COMPANY_NAME_DIGEST': 'company_name_digest',
'COMPANY_ID': 'company_id'
},
'change_after': {
'ID': 1134383668,
'TITLE': '普工电子+情侣宿舍+仓管品保',
'CITY': '黄山',
'DISTRICT': '祁门县',
'COMPANY_NAME': '苏州帆鹏电器有限公司',
'FROM_URL': '苏州高新区泰山路687号2',
'ORI_SALARY': '5001-8000',
'URL_PATH': 'http: //kg.baidu.com/od/4002/0/faf247d4efff7ed7cf7b76af6e5af43f',
'STARTDATE': datetime.date(2020,7,15),
'ENDDATE': datetime.date(2020,8,16),
'SOURCE': '百姓网',
'EDUCATION': '不限',
'EMPLOYER_NUMBER': '200人',
'DESCRIPTION': '本次扩招属于我工厂内部直招,厂区已通过企业营业执照严格认证,敬请放心求职应聘,入',
'EXPERIENCE': '不限',
'CREATE_TIME': datetime.datetime(2020,7,15,0,0),
'UPDATE_TIME': datetime.datetime(2020,7,16,0,0),
'CLASS': '全职',
'DELETED': 0,
'JOB_FIRST_CLASS': '运输/物流/仓管',
'JOB_SECOND_CLASS': '仓库管理员',
'JOB_THIRD_CLASS': '',
'LOCATION': '苏州高新区>泰山路687号',
'ALTERNATEFIELD2': '',
'ALTERNATEFIELD3': '',
'COMPANY_NAME_DIGEST': '7d49c410a18fa394ea14240de45fc564'
}
}
]
}
应用场景说明
简单的单表入库
多表事务入库
一对多的主、子表展开入库
为后续流程提供必要信息
只对符合某些条件的数据进行操作(过滤)
配置样例
以下样例配置,定义了两个catalog:一个指向db_host1上的数据库database1,另一个指向 db_host2上的database2。
一个data_type=data_demo的doc入库时, 入table_a:doc不变,一对一入库 入table_b:doc除了b_list字段其他的不变,b_list要展开,一对多入库 入table_c:doc除了b_list字段其他的不变,c_list要展开,一对多入库
default_data_type: "company_contact_details"
default_operation: "upsert"
catalogues:
- data_type:
- 'data_demo': ['table_a', 'table_b', 'table_c']
table_explode:
table_b: b_list
table_c: c_list
table_match:
table_b: "b1=='5' and b2=='5'"
table_c: ["c1=='5'", "c3=='4'"]
db_start_transaction:
consistent_snapshot: False
isolation_level: "READ COMMITTED"
add_return_keys:
- company_name_digest
db_connection:
user: "upc"
password: "upc@123"
host: "192.168.109.220"
port: "3306"
database: "data_test"
bulk: False