Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Knowledge_share
  • taskHUB 介绍

taskHUB 介绍 · Changes

Page history
Update taskHUB 介绍 authored Nov 02, 2021 by 吴一博's avatar 吴一博
Hide whitespace changes
Inline Side-by-side
Showing with 239 additions and 0 deletions
+239 -0
  • knowledge_share/taskHUB-介绍.md knowledge_share/taskHUB-介绍.md +239 -0
  • No files found.
knowledge_share/taskHUB-介绍.md 0 → 100644
View page @ 6cadfb5b
# taskHUB 介绍
## 分享的目的
* 目前工作中已经大量的在使用,但大家对他没有系统性的认识
* 通过分享可以让
## taskHUB 介绍
### 爬虫任务的状态。
```plantuml
@startuml
[*] --> 待执行 : 提交任务
待执行 -> 成功 : 爬取
待执行 -> 失败 : 爬取
失败 -> 待执行 : 提交重试
成功 --> 待执行 : 生成后续任务
成功 --> [*]
失败 --> [*] : 达到重试的上限
待执行: 任务存在于redis队列中
@enduml
```
### taskHUB 的作用
1. 接受任务。
2. 分发任务。
3. 派生任务。
4. 失败重试。
```plantuml
@startuml
!include <cloudinsight/kafka>
!include <cloudinsight/redis>
component "taskHUB"
component spider
file log
queue redis as "<$redis>任务队列"
queue kafka as "<$kafka>爬虫日志"
http接口 -- taskHUB: 1. 接受任务
taskHUB -right-> redis : 2. 分发任务
redis -> spider
spider --> log
log -left-> kafka
kafka -up-> taskHUB : 3. 接受任务日志,派生新任务或重试失败任务
@enduml
```
#### 接受任务
以HTTP POST方式提交任务
```http request
POST http://10.8.6.222:8518/task/
Content-Type: application/json
{
"company_name": "宁乡县老粮仓镇楚江冻库",
"credit_no": "92430124MA4MGMN16U",
"search_key": "92430124MA4MGMN16U",
"data_type": "change",
"rt": true
}
```
#### 分发任务
基于规则,将任务提交到对应的任务队列(redis)
#### 派生任务
根据日志,及创建任务的规则
#### 失败重试
### taskHUB 的配置
#### 基本概念
* inbound 定义任务的接收行为。
* outbound 定义任务的分发行为。
* 路由 一个任务进入taskHUB后,按outbound的顺序,依次检查任务的状态符合哪个outbound所定义的规则。任务由第一个符合规则的outbound分发。
#### 配置文件结构
```yaml
#子配置文件引用
include:
- path/to/config
#项目标识
project: "name_of_project"
#内置的http服务绑定的地址和端口
server:
bind: '0.0.0.0'
port: 8526
#路由基本设置
routing:
#任务最大的路由次数
limits: -1
#任务终结状态码
terminal_codes:
- 1000
- 1101
#失败任务记录
failure:
path: '{failure_path}'
compress: False
#任务输入设置
inbound:
#任务分发设置
outbound:
#日志设置
logging:
```
#### 配置分离(include)
使用include可以将配置文件拆分成若干相对独立的部分。
在子配置文件中,可以定义inbound和outbound。最终的结果是将所有子配置文件和主配置文件中的inbound和outbound合并在一起。
合并过程是依子配置文件文件名按子母排序升序依次进行。因此,所有outbound最终的顺序由文件名顺序和在文件内的顺序共同决定。
```shell
├── main.yaml
└── config.d
├── sub_config_a.yaml
└── sub_config_b.yaml
```
```yaml
#main.yaml
include:
- config.d
#sub_config_a.yaml
#定义一个outbound
outbound:
- name: o_a_1
selector: ...
#sub_config_b.yaml
#定义两个outbound
outbound:
- name: o_b_2
selector: ...
- name: o_b_1
selector: ...
```
以上配置中,最终有3个outbound 顺序如下:
* o_a_1
* o_b_2
* o_b_1
#### inbound的配置
**selector**
筛选条件列表。输入任务只要满足列表中任一筛选条件就被接收进入任务路由。
**class和init**
指定inbound的类型及其初始化的参数
```yaml
# kafka
class: 'kafka.KafkaInbound'
init:
consumer_settings:
# kafka服务地址
bootstrap.servers: '{kafka_servers}'
# 消费者组ID
group.id: 'taskhub-project-ic'
#爬虫日志的主题列表
topics:
- 'collie-ic-spider-business-log'
```
注意!
对于一个新的taskhub项目,使用KafkaInbound需要明确爬虫日志归集的topic,
并为taskhub分配新的consumer group。
**convert**
对输入任务进行一些处理。
inbound接收到任务后,先进行转换操作再进行匹配。
如果匹配则转换后的任务进入后续路由。
|convert|功能|
|---|---|
|logstash.RemoveBuildinKeys|删除logstash加入的一些内置key|
|common.ExtractTaskParams|将task_params中的内容提出来放在任务数据中。|
|common.ExtractMetadata|将metadata中的内容提出来放在任务数据中。|
**threads**
执行inbound的线程数。
如果是KafkaInbound,该线程数不应该大于topic的分区数。
```yaml
inbound:
- name: 'spider_log'
disable: False
weight: 10
selector:
- "submitter == 'taskhub' and (company_name or credit_no or company_code) and int(task_src) in [1, 11, 5, 3, 22, 21, 7]"
# AH 的老工商URL无法访问
- "task_type == '找新' and detail_url and province != 'AH' and task_src == 0"
# JS TL 找新无URL,但可以走老工商破码上线
- "task_type == '找新' and province in ['JS', 'TJ', 'ZJ'] and task_result == 1000 and task_src == 0"
- "task_type == '小微企业补充行业'"
class: 'kafka.KafkaInbound'
init:
consumer_settings:
bootstrap.servers: '{kafka_servers}'
group.id: 'taskhub-project-ic'
topics:
- 'collie-ic-spider-business-log'
convert:
- class: 'logstash.RemoveBuildinKeys'
- class: 'ic.ExtractTaskParams'
- class: 'ic.CaptchaExtraItem'
- class: 'ic.ChangeSearchKey'
- class: 'ic.ParseSearchinfoItems'
- class: 'ic.ParseNoticeItem'
threads: 8
```
#### 任务分发测试接口
#### 测试接口
\ No newline at end of file
Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages