分享的目的
- 目前工作中已经大量的在使用,但大家对他没有系统性的认识
- 通过分享可以让大家全面了解taskHUB的特性
taskHUB 介绍
爬虫任务的状态
taskHUB 的作用
- 接受任务。
- 分发任务。
- 派生任务。
- 失败重试。
接受任务
以HTTP POST方式提交任务
POST http://10.8.6.222:8518/task/
Content-Type: application/json
{
"company_name": "宁乡县老粮仓镇楚江冻库",
"credit_no": "92430124MA4MGMN16U",
"search_key": "92430124MA4MGMN16U",
"data_type": "change",
"rt": true
}
分发任务
基于规则,将任务提交到对应的任务队列(redis)
派生任务
根据日志,及创建任务的规则
失败重试
针对日志中特定任务状态(task_result)的任务,将其再次提交到任务队列中,以实现对失败任务的重试。
taskHUB 的配置
基本概念
- 任务 一个任务,在taskHUB中,是以dict形式流转的一条数据。在taskHUB以外(redis、kafka、日志文件)任务均以json形式存储。
- inbound 定义任务的接收行为。
- outbound 定义任务的分发行为。
- 路由 一个任务进入taskHUB后,按outbound的顺序,依次检查任务的状态符合哪个outbound所定义的规则。任务由第一个符合规则的outbound分发。
配置文件结构
#子配置文件引用
include:
- path/to/config
#项目标识
project: "name_of_project"
#内置的http服务绑定的地址和端口
server:
bind: '0.0.0.0'
port: 8526
#路由基本设置
routing:
#任务最大的路由次数
limits: -1
#任务终结状态码
terminal_codes:
- 1000
- 1101
#失败任务记录
failure:
path: '{failure_path}'
compress: False
#任务输入设置
inbound:
#任务分发设置
outbound:
#日志设置
logging:
配置分离(include)
使用include可以将配置文件拆分成若干相对独立的部分。 在子配置文件中,可以定义inbound和outbound。
目录结构
├── main.yaml
└── config.d
├── sub_config_a.yaml
└── sub_config_b.yaml
配置文件
#main.yaml
include:
- config.d
#sub_config_a.yaml
#定义一个outbound
outbound:
- name: o_a_1
selector: ...
#sub_config_b.yaml
#定义两个outbound
outbound:
- name: o_b_2
selector: ...
- name: o_b_1
selector: ...
inbound/outbound 顺序
最终的结果是将所有子配置文件和主配置文件中的inbound和outbound合并在一起。 合并过程是依子配置文件文件名按子母排序升序依次进行。因此,所有outbound最终的顺序由文件名顺序和在文件内的顺序共同决定。
以上配置中,最终有3个outbound 顺序如下:
- o_a_1
- o_b_2
- o_b_1
inbound的配置
inbound:
- name: 'spider_log'
disable: False
weight: 10
selector:
- "submitter == 'taskhub' and (company_name or credit_no or company_code) and int(task_src) in [1, 11, 5, 3, 22, 21, 7]"
# AH 的老工商URL无法访问
- "task_type == '找新' and detail_url and province != 'AH' and task_src == 0"
# JS TL 找新无URL,但可以走老工商破码上线
- "task_type == '找新' and province in ['JS', 'TJ', 'ZJ'] and task_result == 1000 and task_src == 0"
- "task_type == '小微企业补充行业'"
class: 'kafka.KafkaInbound'
init:
consumer_settings:
bootstrap.servers: '{kafka_servers}'
group.id: 'taskhub-project-ic'
topics:
- 'collie-ic-spider-business-log'
convert:
- class: 'logstash.RemoveBuildinKeys'
- class: 'ic.ExtractTaskParams'
- class: 'ic.CaptchaExtraItem'
- class: 'ic.ChangeSearchKey'
- class: 'ic.ParseSearchinfoItems'
- class: 'ic.ParseNoticeItem'
threads: 8
selector
筛选条件列表。输入任务只要满足列表中任一筛选条件就被接收进入任务路由。
class和init
指定inbound的类型及其初始化的参数
# kafka
class: 'kafka.KafkaInbound'
init:
consumer_settings:
# kafka服务地址
bootstrap.servers: '{kafka_servers}'
# 消费者组ID
group.id: 'taskhub-project-ic'
#爬虫日志的主题列表
topics:
- 'collie-ic-spider-business-log'
注意! 对于一个新的taskhub项目,使用KafkaInbound需要明确爬虫日志归集的topic, 并为taskhub分配新的consumer group。
convert
对输入任务进行一些处理。 inbound接收到任务后,先进行转换操作再进行匹配。 如果匹配则转换后的任务进入后续路由。
convert | 功能 |
---|---|
logstash.RemoveBuildinKeys | 删除logstash加入的一些内置key |
common.ExtractTaskParams | 将task_params中的内容提出来放在任务数据中。 |
common.ExtractMetadata | 将metadata中的内容提出来放在任务数据中。 |
threads
执行inbound的线程数。 如果是KafkaInbound,该线程数不应该大于topic的分区数。
outbound的配置
定义了符合特定条件的任务的分发方式。
outbound:
# 新成立及变更企业上线,指定更新维度
- name: 'zongju_captcha_ts3'
# disable: True
selector:
- "task_src in [3] and not detail_url and data_type and task_parms['province'] == 'GD'"
max_lag: -1
retry_limits: 5
token_scope: "*"
class: 'redis_queue.RedisListOutbound'
init:
url: '{redis_url}'
key: "captcha_spider"
task_params:
- company_name
- company_code
- company_key
- credit_no
- search_key
- province
- outbound
- company_name_digest
- search_id
- change_id
- data_type
task_src: 3
task_type: 破码
priority: 1000
rt_priority: 2000
RedisListOutbound的参数
参数 | 涵义 |
---|---|
name | 显示名称 |
selector | 匹配规则列表 |
url | redis连接串 redis://... |
key | 任务队列的KEY |
task_params | 任务参数字段列表 |
task_params_wrapped | 是否将任务参数包装在task_params键之内 |
retry_limits | 重试次数上限 |
priority | 普通任务默认优先级 |
rt_priority | 实时任务优先级 |
dont_retry_status | |
direct_failback_status | |
failback | |
token_scope | |
token_per_second | |
reset_retry_times | |
max_lag | 任务队列的最大积压 |
task_params
从流转的任务数据中,提取指定的键作为任务参数。
除了在配置中指定的键,taskHUB也会附加一些内置键在任务参数中。 注意:爬虫项目在定义任务参数时要避免使用以下关键字作为参数名.
内置参数 | 涵义 |
---|---|
outbound | outbound的名称 |
routed_count | 任务被路由的次数 |
retry_times | 任务重试的次数 |
submitter | 固定为taskhub |
submit_time | 任务提交的时间 |
retry_limits | 任务重试上限 |
rt | 是否是实时任务 |
priority | 任务的优先级 |
task_uuid | 任务的UUID,由taskhub生成 |
group_retry_times | 任务组重试次数 |
token_scope |
task_params的限制
目前taskHUB只从任务数据的顶级键中提取任务参数。 所以需要保证任务数据流转到outbound时,task_params所需的键都存在于任务数据的顶层。 ExtractTaskParams, ExtractMetadata这些convert就是为了保证这一点而设计的。
task_params_wrapped
# task_params_wrapped=True
{
"task_params": {
"company_name": "浙江温州龙湾农村商业银行股份有限公司",
"credit_no": "913303007772078015",
"search_key": "913303007772078015",
"data_type": "employee",
"rt": true
}
}
# task_params_wrapped=False
{
"company_name": "浙江温州龙湾农村商业银行股份有限公司",
"credit_no": "913303007772078015",
"search_key": "913303007772078015",
"data_type": "employee",
"rt": true
}
任务的处理
新任务
根据路由规则进行匹配,如匹配则分发。如不匹配则失败结束。
成功的任务
由主配置文件中的参数 terminal_codes。定义哪些任务可以(成功)终结。
失败任务
一个失败任务有以下几种可能的结果
- 满足重试条件,在原outbound进行重试
- 指定了后备outbound,在后备outbound上重试
- 处于不用重试的状态,尝试使用路由规则进行匹配
- 不满足上述条件,以失败结束
dont_retry_status
有一些错误状态的任务无需进行重试。如, 任务中的url错误导致无法正常访问。重试也没有意义。 处于这些状态的任务并没有成功, 重试也没有机会成功。所以不需要再重试. 在路由规则中需要设计相应的规则让处于这类状态的任务能够正确的路由到其他可处理的outbound上
direct_failback_status
哪些任务状态直接分发到failback所指定的outbound上。
常用接口
任务提交接口
http://<host>:<port>/task/
POST http://10.8.6.222:8517/task/
Content-Type: application/json
{
"company_name": "浙江温州龙湾农村商业银行股份有限公司",
"credit_no": "913303007772078015",
"search_key": "913303007772078015",
"data_type": "employee",
"rt": true
}
任务分发测试接口
http://<host>:<port>/inbound/<inbound名称>/check_task/
POST http://10.8.6.222:8526/inbound/gravel_inbound/check_task/
Content-Type: application/json
{
"task_result": 1001,
"spider_start_time": "2021-10-22 15:12:02.145",
"type": "tax_arrears_announcement",
"spider_end_time": "2021-10-22 15:12:03",
"spider_ip": "10.8.6.51",
"metadata": {
"period": "2020年第2期"
},
"http_code": 200,
"log_date": "2021.10.22",
"spider_name": "tax_arrears_announcement",
"data_type": "task_distribution",
"error_msg": "",
"task_params": {
"city": "guangxi",
"operate": "all",
"task_type": "task"
}
}
outbound查看接口
http://<host>:<port>/outbound/<outbound_prefix>
GET http://10.8.6.222:8526/outbound/
Content-Type: application/json