taskHUB 介绍
分享的目的
- 目前工作中已经大量的在使用,但大家对他没有系统性的认识
- 通过分享可以让
taskHUB 介绍
爬虫任务的状态。
taskHUB 的作用
- 接受任务。
- 分发任务。
- 派生任务。
- 失败重试。
接受任务
以HTTP POST方式提交任务
POST http://10.8.6.222:8518/task/
Content-Type: application/json
{
"company_name": "宁乡县老粮仓镇楚江冻库",
"credit_no": "92430124MA4MGMN16U",
"search_key": "92430124MA4MGMN16U",
"data_type": "change",
"rt": true
}
分发任务
基于规则,将任务提交到对应的任务队列(redis)
派生任务
根据日志,及创建任务的规则
失败重试
taskHUB 的配置
基本概念
- inbound 定义任务的接收行为。
- outbound 定义任务的分发行为。
- 路由 一个任务进入taskHUB后,按outbound的顺序,依次检查任务的状态符合哪个outbound所定义的规则。任务由第一个符合规则的outbound分发。
配置文件结构
#子配置文件引用
include:
- path/to/config
#项目标识
project: "name_of_project"
#内置的http服务绑定的地址和端口
server:
bind: '0.0.0.0'
port: 8526
#路由基本设置
routing:
#任务最大的路由次数
limits: -1
#任务终结状态码
terminal_codes:
- 1000
- 1101
#失败任务记录
failure:
path: '{failure_path}'
compress: False
#任务输入设置
inbound:
#任务分发设置
outbound:
#日志设置
logging:
配置分离(include)
使用include可以将配置文件拆分成若干相对独立的部分。 在子配置文件中,可以定义inbound和outbound。最终的结果是将所有子配置文件和主配置文件中的inbound和outbound合并在一起。 合并过程是依子配置文件文件名按子母排序升序依次进行。因此,所有outbound最终的顺序由文件名顺序和在文件内的顺序共同决定。
├── main.yaml
└── config.d
├── sub_config_a.yaml
└── sub_config_b.yaml
#main.yaml
include:
- config.d
#sub_config_a.yaml
#定义一个outbound
outbound:
- name: o_a_1
selector: ...
#sub_config_b.yaml
#定义两个outbound
outbound:
- name: o_b_2
selector: ...
- name: o_b_1
selector: ...
以上配置中,最终有3个outbound 顺序如下:
- o_a_1
- o_b_2
- o_b_1
inbound的配置
selector 筛选条件列表。输入任务只要满足列表中任一筛选条件就被接收进入任务路由。
class和init 指定inbound的类型及其初始化的参数
# kafka
class: 'kafka.KafkaInbound'
init:
consumer_settings:
# kafka服务地址
bootstrap.servers: '{kafka_servers}'
# 消费者组ID
group.id: 'taskhub-project-ic'
#爬虫日志的主题列表
topics:
- 'collie-ic-spider-business-log'
注意! 对于一个新的taskhub项目,使用KafkaInbound需要明确爬虫日志归集的topic, 并为taskhub分配新的consumer group。
convert 对输入任务进行一些处理。 inbound接收到任务后,先进行转换操作再进行匹配。 如果匹配则转换后的任务进入后续路由。
convert | 功能 |
---|---|
logstash.RemoveBuildinKeys | 删除logstash加入的一些内置key |
common.ExtractTaskParams | 将task_params中的内容提出来放在任务数据中。 |
common.ExtractMetadata | 将metadata中的内容提出来放在任务数据中。 |
threads 执行inbound的线程数。 如果是KafkaInbound,该线程数不应该大于topic的分区数。
inbound:
- name: 'spider_log'
disable: False
weight: 10
selector:
- "submitter == 'taskhub' and (company_name or credit_no or company_code) and int(task_src) in [1, 11, 5, 3, 22, 21, 7]"
# AH 的老工商URL无法访问
- "task_type == '找新' and detail_url and province != 'AH' and task_src == 0"
# JS TL 找新无URL,但可以走老工商破码上线
- "task_type == '找新' and province in ['JS', 'TJ', 'ZJ'] and task_result == 1000 and task_src == 0"
- "task_type == '小微企业补充行业'"
class: 'kafka.KafkaInbound'
init:
consumer_settings:
bootstrap.servers: '{kafka_servers}'
group.id: 'taskhub-project-ic'
topics:
- 'collie-ic-spider-business-log'
convert:
- class: 'logstash.RemoveBuildinKeys'
- class: 'ic.ExtractTaskParams'
- class: 'ic.CaptchaExtraItem'
- class: 'ic.ChangeSearchKey'
- class: 'ic.ParseSearchinfoItems'
- class: 'ic.ParseNoticeItem'
threads: 8