|
|
|
# taskHUB 介绍
|
|
|
|
|
|
|
|
## 分享的目的
|
|
|
|
|
|
|
|
* 目前工作中已经大量的在使用,但大家对他没有系统性的认识
|
|
|
|
* 通过分享可以让
|
|
|
|
|
|
|
|
## taskHUB 介绍
|
|
|
|
|
|
|
|
### 爬虫任务的状态。
|
|
|
|
|
|
|
|
```plantuml
|
|
|
|
@startuml
|
|
|
|
|
|
|
|
[*] --> 待执行 : 提交任务
|
|
|
|
待执行 -> 成功 : 爬取
|
|
|
|
待执行 -> 失败 : 爬取
|
|
|
|
失败 -> 待执行 : 提交重试
|
|
|
|
成功 --> 待执行 : 生成后续任务
|
|
|
|
成功 --> [*]
|
|
|
|
失败 --> [*] : 达到重试的上限
|
|
|
|
|
|
|
|
待执行: 任务存在于redis队列中
|
|
|
|
@enduml
|
|
|
|
```
|
|
|
|
|
|
|
|
### taskHUB 的作用
|
|
|
|
|
|
|
|
1. 接受任务。
|
|
|
|
2. 分发任务。
|
|
|
|
3. 派生任务。
|
|
|
|
4. 失败重试。
|
|
|
|
|
|
|
|
```plantuml
|
|
|
|
@startuml
|
|
|
|
!include <cloudinsight/kafka>
|
|
|
|
!include <cloudinsight/redis>
|
|
|
|
|
|
|
|
component "taskHUB"
|
|
|
|
component spider
|
|
|
|
file log
|
|
|
|
|
|
|
|
queue redis as "<$redis>任务队列"
|
|
|
|
queue kafka as "<$kafka>爬虫日志"
|
|
|
|
|
|
|
|
http接口 -- taskHUB: 1. 接受任务
|
|
|
|
taskHUB -right-> redis : 2. 分发任务
|
|
|
|
redis -> spider
|
|
|
|
spider --> log
|
|
|
|
log -left-> kafka
|
|
|
|
kafka -up-> taskHUB : 3. 接受任务日志,派生新任务或重试失败任务
|
|
|
|
|
|
|
|
@enduml
|
|
|
|
```
|
|
|
|
|
|
|
|
#### 接受任务
|
|
|
|
|
|
|
|
以HTTP POST方式提交任务
|
|
|
|
|
|
|
|
```http request
|
|
|
|
POST http://10.8.6.222:8518/task/
|
|
|
|
Content-Type: application/json
|
|
|
|
|
|
|
|
{
|
|
|
|
"company_name": "宁乡县老粮仓镇楚江冻库",
|
|
|
|
"credit_no": "92430124MA4MGMN16U",
|
|
|
|
"search_key": "92430124MA4MGMN16U",
|
|
|
|
"data_type": "change",
|
|
|
|
"rt": true
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
#### 分发任务
|
|
|
|
|
|
|
|
基于规则,将任务提交到对应的任务队列(redis)
|
|
|
|
|
|
|
|
#### 派生任务
|
|
|
|
|
|
|
|
根据日志,及创建任务的规则
|
|
|
|
|
|
|
|
#### 失败重试
|
|
|
|
|
|
|
|
### taskHUB 的配置
|
|
|
|
|
|
|
|
#### 基本概念
|
|
|
|
|
|
|
|
* inbound 定义任务的接收行为。
|
|
|
|
* outbound 定义任务的分发行为。
|
|
|
|
* 路由 一个任务进入taskHUB后,按outbound的顺序,依次检查任务的状态符合哪个outbound所定义的规则。任务由第一个符合规则的outbound分发。
|
|
|
|
|
|
|
|
#### 配置文件结构
|
|
|
|
|
|
|
|
```yaml
|
|
|
|
#子配置文件引用
|
|
|
|
include:
|
|
|
|
- path/to/config
|
|
|
|
|
|
|
|
#项目标识
|
|
|
|
project: "name_of_project"
|
|
|
|
|
|
|
|
#内置的http服务绑定的地址和端口
|
|
|
|
server:
|
|
|
|
bind: '0.0.0.0'
|
|
|
|
port: 8526
|
|
|
|
|
|
|
|
#路由基本设置
|
|
|
|
routing:
|
|
|
|
#任务最大的路由次数
|
|
|
|
limits: -1
|
|
|
|
#任务终结状态码
|
|
|
|
terminal_codes:
|
|
|
|
- 1000
|
|
|
|
- 1101
|
|
|
|
#失败任务记录
|
|
|
|
failure:
|
|
|
|
path: '{failure_path}'
|
|
|
|
compress: False
|
|
|
|
|
|
|
|
#任务输入设置
|
|
|
|
inbound:
|
|
|
|
#任务分发设置
|
|
|
|
outbound:
|
|
|
|
#日志设置
|
|
|
|
logging:
|
|
|
|
|
|
|
|
```
|
|
|
|
#### 配置分离(include)
|
|
|
|
使用include可以将配置文件拆分成若干相对独立的部分。
|
|
|
|
在子配置文件中,可以定义inbound和outbound。最终的结果是将所有子配置文件和主配置文件中的inbound和outbound合并在一起。
|
|
|
|
合并过程是依子配置文件文件名按子母排序升序依次进行。因此,所有outbound最终的顺序由文件名顺序和在文件内的顺序共同决定。
|
|
|
|
|
|
|
|
```shell
|
|
|
|
├── main.yaml
|
|
|
|
└── config.d
|
|
|
|
├── sub_config_a.yaml
|
|
|
|
└── sub_config_b.yaml
|
|
|
|
```
|
|
|
|
|
|
|
|
```yaml
|
|
|
|
#main.yaml
|
|
|
|
include:
|
|
|
|
- config.d
|
|
|
|
|
|
|
|
#sub_config_a.yaml
|
|
|
|
#定义一个outbound
|
|
|
|
outbound:
|
|
|
|
- name: o_a_1
|
|
|
|
selector: ...
|
|
|
|
|
|
|
|
#sub_config_b.yaml
|
|
|
|
#定义两个outbound
|
|
|
|
outbound:
|
|
|
|
- name: o_b_2
|
|
|
|
selector: ...
|
|
|
|
|
|
|
|
- name: o_b_1
|
|
|
|
selector: ...
|
|
|
|
```
|
|
|
|
|
|
|
|
以上配置中,最终有3个outbound 顺序如下:
|
|
|
|
* o_a_1
|
|
|
|
* o_b_2
|
|
|
|
* o_b_1
|
|
|
|
|
|
|
|
#### inbound的配置
|
|
|
|
**selector**
|
|
|
|
筛选条件列表。输入任务只要满足列表中任一筛选条件就被接收进入任务路由。
|
|
|
|
|
|
|
|
**class和init**
|
|
|
|
指定inbound的类型及其初始化的参数
|
|
|
|
|
|
|
|
```yaml
|
|
|
|
# kafka
|
|
|
|
|
|
|
|
class: 'kafka.KafkaInbound'
|
|
|
|
init:
|
|
|
|
consumer_settings:
|
|
|
|
# kafka服务地址
|
|
|
|
bootstrap.servers: '{kafka_servers}'
|
|
|
|
# 消费者组ID
|
|
|
|
group.id: 'taskhub-project-ic'
|
|
|
|
|
|
|
|
#爬虫日志的主题列表
|
|
|
|
topics:
|
|
|
|
- 'collie-ic-spider-business-log'
|
|
|
|
```
|
|
|
|
注意!
|
|
|
|
对于一个新的taskhub项目,使用KafkaInbound需要明确爬虫日志归集的topic,
|
|
|
|
并为taskhub分配新的consumer group。
|
|
|
|
|
|
|
|
**convert**
|
|
|
|
对输入任务进行一些处理。
|
|
|
|
inbound接收到任务后,先进行转换操作再进行匹配。
|
|
|
|
如果匹配则转换后的任务进入后续路由。
|
|
|
|
|
|
|
|
|convert|功能|
|
|
|
|
|---|---|
|
|
|
|
|logstash.RemoveBuildinKeys|删除logstash加入的一些内置key|
|
|
|
|
|common.ExtractTaskParams|将task_params中的内容提出来放在任务数据中。|
|
|
|
|
|common.ExtractMetadata|将metadata中的内容提出来放在任务数据中。|
|
|
|
|
|
|
|
|
**threads**
|
|
|
|
执行inbound的线程数。
|
|
|
|
如果是KafkaInbound,该线程数不应该大于topic的分区数。
|
|
|
|
|
|
|
|
```yaml
|
|
|
|
inbound:
|
|
|
|
- name: 'spider_log'
|
|
|
|
disable: False
|
|
|
|
weight: 10
|
|
|
|
selector:
|
|
|
|
- "submitter == 'taskhub' and (company_name or credit_no or company_code) and int(task_src) in [1, 11, 5, 3, 22, 21, 7]"
|
|
|
|
# AH 的老工商URL无法访问
|
|
|
|
- "task_type == '找新' and detail_url and province != 'AH' and task_src == 0"
|
|
|
|
# JS TL 找新无URL,但可以走老工商破码上线
|
|
|
|
- "task_type == '找新' and province in ['JS', 'TJ', 'ZJ'] and task_result == 1000 and task_src == 0"
|
|
|
|
- "task_type == '小微企业补充行业'"
|
|
|
|
class: 'kafka.KafkaInbound'
|
|
|
|
init:
|
|
|
|
consumer_settings:
|
|
|
|
bootstrap.servers: '{kafka_servers}'
|
|
|
|
group.id: 'taskhub-project-ic'
|
|
|
|
|
|
|
|
topics:
|
|
|
|
- 'collie-ic-spider-business-log'
|
|
|
|
convert:
|
|
|
|
- class: 'logstash.RemoveBuildinKeys'
|
|
|
|
- class: 'ic.ExtractTaskParams'
|
|
|
|
- class: 'ic.CaptchaExtraItem'
|
|
|
|
- class: 'ic.ChangeSearchKey'
|
|
|
|
- class: 'ic.ParseSearchinfoItems'
|
|
|
|
- class: 'ic.ParseNoticeItem'
|
|
|
|
threads: 8
|
|
|
|
```
|
|
|
|
|
|
|
|
#### 任务分发测试接口
|
|
|
|
|
|
|
|
#### 测试接口 |
|
|
|
\ No newline at end of file |