|
|
# 背景
|
|
|
|
|
|
在日常的工作中,经常需要以符合特定条件的公司列表作为数据任务的输入。例如,某个爬虫任务需要采集所有在营企业的数据,要以企业的统一信用代码作为搜索条件。因此我们需要一种方法,它可以方便获取所需数据,并容易的将其作为爬虫任务提交到爬虫队列。本文介绍了使用data_pump结合ES来达到此目的的方法。
|
|
|
|
|
|
## 阅读本文前所需了解知识
|
|
|
|
|
|
* data_pump的基本使用
|
|
|
* elastsearch的基本使用
|
|
|
* taskhub (taskHUB-介绍)
|
|
|
|
|
|
|
|
|
# 方法说明
|
|
|
|
|
|
本方法使用data_pump从ES的[company索引](https://es-cn-4591blu580004eavf.kibana.elasticsearch.aliyuncs.com:5601/goto/758f8ff6160977c5ead38e427a268e17)中查询相关数据。并将任务提交到taskhub接口或爬虫的redis队列。
|
|
|
|
|
|
|
|
|
## EsDocReader的配置
|
|
|
|
|
|
```yaml
|
|
|
es_company:
|
|
|
class: es.EsDocReader
|
|
|
init:
|
|
|
hosts:
|
|
|
- host: es-cn-4591blu580004eavf.elasticsearch.aliyuncs.com
|
|
|
port: 9200
|
|
|
http_auth: [ 'elastic', 'XXXXX' ]
|
|
|
index: company
|
|
|
dsl:
|
|
|
_source: [
|
|
|
"company_name",
|
|
|
"credit_no",
|
|
|
"company_code",
|
|
|
"company_name_digest"
|
|
|
]
|
|
|
"query": {
|
|
|
"bool": {
|
|
|
"must": [
|
|
|
{ "match_phrase": { "n_company_status": { "query": "正常" } } },
|
|
|
{ "match_phrase": { "company_major_type": { "query": 3 } } },
|
|
|
{ "range": { "updated": { "lt": "{end_date}" } } }
|
|
|
]
|
|
|
}
|
|
|
}
|
|
|
|
|
|
offset:
|
|
|
store: "{offset_store_path}"
|
|
|
field: company_name_digest.keyword
|
|
|
reset_hours: 0
|
|
|
|
|
|
limit: 5000
|
|
|
timeout: 600
|
|
|
|
|
|
```
|
|
|
|
|
|
| 参数 | 注释 |
|
|
|
| :-- | -- |
|
|
|
|dsl.query |数据筛选条件。如果不会写es查询语句,可以在kibana的discover中筛选,然后在inspect->request中拷贝条件|
|
|
|
|dsl._source |返回数据的字段列表|
|
|
|
|offset.store | offset 存储路径 |
|
|
|
|offset.field | 充当offset的字段。在遍历公司这个场景下固定使用 company_name_digest.keyword|
|
|
|
|offset.reset_hours | 重置offset的时间周期。小时数。|
|
|
|
|limit | 最大记录数 |
|
|
|
|timeout| 查询ES的超时。单位为秒 |
|
|
|
|
|
|
### 关于 dsl.query
|
|
|
|
|
|
该参数用于指定筛选数据所需的条件。
|
|
|
如果不会写es查询语句,可以在kibana的discover中筛选,然后在inspect->request中拷贝条件直接放在配置中使用。
|
|
|
|
|
|
### 关于 reset_hours
|
|
|
|
|
|
如果任务是周期性重复执行。则需要在遍历完所有数据后将offset重置。就需要依靠此参数来开启。无数据可以消费与上一次重置到当时时间差大于等于reset_hours值均为重置offset的必要条件。
|
|
|
|
|
|
### company索引字段说明
|
|
|
|
|
|
以下为一些常用的用于数据过滤的字段,完整说明参见钉钉群文件:基础架构组/03_数据资产/Comapny字段说明.xlsx
|
|
|
|
|
|
| 字段名 | 注释 |
|
|
|
| -- | -- |
|
|
|
|company_name_digest|company_name_digest|
|
|
|
|company_status|公司状态|
|
|
|
|n_company_status|简化后公司状态(正常、注销、吊销、其他)|
|
|
|
|company_type|公司类型|
|
|
|
|company_major_type|公司类型大类:1 个体 2 合作社 3 企业|
|
|
|
|company_minor_type|企业登记类型代码,具体见《企业登记注册类型对照表》|
|
|
|
|capital|注册资本(元)|
|
|
|
|credit_no|统一信用代码|
|
|
|
|org_code|组织机构代码|
|
|
|
|establish_date|成立日期|
|
|
|
|industry_l1_code|行业分类|
|
|
|
|industry_l2_code|行业分类|
|
|
|
|industry_l3_code|行业分类|
|
|
|
|industry_l4_code|行业分类|
|
|
|
|province_short|省缩写|
|
|
|
|history_name|历史名称|
|
|
|
|business_scope|经营范围|
|
|
|
|division_code|登记地行政区划代码|
|
|
|
|employees|主要人员名称|
|
|
|
|partners|股东名称|
|
|
|
|risk_tags|风险标签 具体见 sheet:risk_tags|
|
|
|
|tags|标签 具体见 sheet:tags|
|
|
|
|company_scale_type|最新一年年报--大中小微型分类|
|
|
|
|annual_report_years|提交年报的年份集合|
|
|
|
|no_ar_submitted|没有提交年报的年份集合|
|
|
|
|spider_update_time|工商爬虫最后一次爬取的时间|
|
|
|
|
|
|
## TaskhubWriter配置
|
|
|
|
|
|
|
|
|
```yaml
|
|
|
taskhub:
|
|
|
class: taskhub.TaskhubWriter
|
|
|
init:
|
|
|
host: "10.8.6.222"
|
|
|
port: 8518
|
|
|
feed_on_demand: true
|
|
|
outbound: zongju_captcha
|
|
|
task_params:
|
|
|
- company_name_digest
|
|
|
- company_code
|
|
|
- company_name
|
|
|
- credit_no
|
|
|
- search_key
|
|
|
task_src: 1
|
|
|
priority: 200
|
|
|
rt: true
|
|
|
```
|
|
|
|
|
|
| 参数 | 注释 |
|
|
|
| :-- | -- |
|
|
|
|host | taskhub服务地址|
|
|
|
|port | taskhub服务端口|
|
|
|
|feed_on_demand| 是否按需提交。如果设为true, 则要根据outbound所指定的任务队列最大任务量与积压情况来决定是否需要提交新的任务 |
|
|
|
|outbound| 用于检查积压的 outbound|
|
|
|
|task_params | 任务参数字段列表 |
|
|
|
|附加参数 | |
|
|
|
|