背景
在日常的工作中,经常需要以符合特定条件的公司列表作为数据任务的输入。例如,某个爬虫任务需要采集所有在营企业的数据,要以企业的统一信用代码作为搜索条件。因此我们需要一种方法,它可以方便获取所需数据,并容易的将其作为爬虫任务提交到爬虫队列。本文介绍了使用data_pump结合ES来达到此目的的方法。
阅读本文前所需了解知识
- data_pump的基本使用
- elastsearch的基本使用
- taskhub (taskHUB-介绍)
方法说明
本方法使用data_pump从ES的company索引中查询相关数据。并将任务提交到taskhub接口或爬虫的redis队列。
EsDocReader的配置
es_company:
class: es.EsDocReader
init:
hosts:
- host: es-cn-4591blu580004eavf.elasticsearch.aliyuncs.com
port: 9200
http_auth: [ 'elastic', 'XXXXX' ]
index: company
dsl:
_source: [
"company_name",
"credit_no",
"company_code",
"company_name_digest"
]
"query": {
"bool": {
"must": [
{ "match_phrase": { "n_company_status": { "query": "正常" } } },
{ "match_phrase": { "company_major_type": { "query": 3 } } },
{ "range": { "updated": { "lt": "{end_date}" } } }
]
}
}
offset:
store: "{offset_store_path}"
field: company_name_digest.keyword
reset_hours: 0
limit: 5000
timeout: 600
参数 | 注释 |
---|---|
dsl.query | 数据筛选条件。如果不会写es查询语句,可以在kibana的discover中筛选,然后在inspect->request中拷贝条件 |
dsl._source | 返回数据的字段列表 |
offset.store | offset 存储路径 |
offset.field | 充当offset的字段。在遍历公司这个场景下固定使用 company_name_digest.keyword |
offset.reset_hours | 重置offset的时间周期。小时数。 |
limit | 最大记录数 |
timeout | 查询ES的超时。单位为秒 |
关于 dsl.query
该参数用于指定筛选数据所需的条件。 如果不会写es查询语句,可以在kibana的discover中筛选,然后在inspect->request中拷贝条件直接放在配置中使用。
关于 reset_hours
如果任务是周期性重复执行。则需要在遍历完所有数据后将offset重置。就需要依靠此参数来开启。无数据可以消费与上一次重置到当时时间差大于等于reset_hours值均为重置offset的必要条件。
company索引字段说明
以下为一些常用的用于数据过滤的字段,完整说明参见钉钉群文件:基础架构组/03_数据资产/Comapny字段说明.xlsx
字段名 | 注释 |
---|---|
company_name_digest | company_name_digest |
company_status | 公司状态 |
n_company_status | 简化后公司状态(正常、注销、吊销、其他) |
company_type | 公司类型 |
company_major_type | 公司类型大类:1 个体 2 合作社 3 企业 |
company_minor_type | 企业登记类型代码,具体见《企业登记注册类型对照表》 |
capital | 注册资本(元) |
credit_no | 统一信用代码 |
org_code | 组织机构代码 |
establish_date | 成立日期 |
industry_l1_code | 行业分类 |
industry_l2_code | 行业分类 |
industry_l3_code | 行业分类 |
industry_l4_code | 行业分类 |
province_short | 省缩写 |
history_name | 历史名称 |
business_scope | 经营范围 |
division_code | 登记地行政区划代码 |
employees | 主要人员名称 |
partners | 股东名称 |
risk_tags | 风险标签 具体见 sheet:risk_tags |
tags | 标签 具体见 sheet:tags |
company_scale_type | 最新一年年报--大中小微型分类 |
annual_report_years | 提交年报的年份集合 |
no_ar_submitted | 没有提交年报的年份集合 |
spider_update_time | 工商爬虫最后一次爬取的时间 |
TaskhubWriter配置
taskhub:
class: taskhub.TaskhubWriter
init:
host: "10.8.6.222"
port: 8518
feed_on_demand: true
outbound: zongju_captcha
task_params:
- company_name_digest
- company_code
- company_name
- credit_no
- search_key
task_src: 1
priority: 200
rt: true
参数 | 注释 |
---|---|
host | taskhub服务地址 |
port | taskhub服务端口 |
feed_on_demand | 是否按需提交。如果设为true, 则要根据outbound所指定的任务队列最大任务量与积压情况来决定是否需要提交新的任务 |
outbound | 用于检查积压的 outbound |
task_params | 任务参数字段列表 |
附加参数 | 自定义的任务参数。用于附加一些固定的任务参数在实际提交的任务中。如指定优先级。 |
关于按需提交
一些情况下,不希望任务队列中积压过多任务。则需要打开feed_on_demand来控制任务提交。当writer开启该选项后,data_pump在从reader消费数据前,会读取writer的需求量,然后从reader消费相应数量的数据。目前 redis和taskhub 两个writer支持该功能。