Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Knowledge_share
  • taskHUB 介绍

taskHUB 介绍 · Changes

Page history
add:taskhub authored Nov 02, 2021 by 吴一博's avatar 吴一博
Hide whitespace changes
Inline Side-by-side
Showing with 265 additions and 50 deletions
+265 -50
  • knowledge_share/taskHUB-介绍.md knowledge_share/taskHUB-介绍.md +265 -50
  • No files found.
knowledge_share/taskHUB-介绍.md
View page @ 7de1d747
---
marp: false paginate: true headingDivider: 3
# theme: uncover
# class: giga
# size: 4:3
# backgroundImage: url('images/logo.png')
backgroundPosition: 95% 5% backgroundSize: h:80 w:80
---
# 分享的目的
* 目前工作中已经大量的在使用,但大家对他没有**系统性**的认识
......@@ -5,7 +19,7 @@
# taskHUB 介绍
## 爬虫任务的状态。
## 爬虫任务的状态
```plantuml
@startuml
......@@ -78,21 +92,24 @@ Content-Type: application/json
### 失败重试
## taskHUB 的配置
针对日志中特定任务状态(task_result)的任务,将其再次提交到任务队列中,以实现对失败任务的重试。
### 基本概念
# taskHUB 的配置
## 基本概念
* 任务 一个任务,在taskHUB中,是以dict形式流转的一条数据。在taskHUB以外(redis、kafka、日志文件)任务均以json形式存储。
* inbound 定义任务的接收行为。
* outbound 定义任务的分发行为。
* 路由 一个任务进入taskHUB后,按outbound的顺序,依次检查任务的状态符合哪个outbound所定义的规则。任务由第一个符合规则的outbound分发。
### 配置文件结构
## 配置文件结构
```yaml
#子配置文件引用
include:
- path/to/config
#项目标识
project: "name_of_project"
......@@ -122,10 +139,12 @@ outbound:
logging:
```
### 配置分离(include)
使用include可以将配置文件拆分成若干相对独立的部分。
在子配置文件中,可以定义inbound和outbound。最终的结果是将所有子配置文件和主配置文件中的inbound和outbound合并在一起。
合并过程是依子配置文件文件名按子母排序升序依次进行。因此,所有outbound最终的顺序由文件名顺序和在文件内的顺序共同决定。
## 配置分离(include)
使用include可以将配置文件拆分成若干相对独立的部分。 在子配置文件中,可以定义inbound和outbound。
### 目录结构
```shell
├── main.yaml
......@@ -134,11 +153,13 @@ logging:
└── sub_config_b.yaml
```
### 配置文件
```yaml
#main.yaml
include:
- config.d
#sub_config_a.yaml
#定义一个outbound
outbound:
......@@ -155,16 +176,54 @@ outbound:
selector: ...
```
### inbound/outbound 顺序
最终的结果是将所有子配置文件和主配置文件中的inbound和outbound合并在一起。 合并过程是依子配置文件文件名按子母排序升序依次进行。因此,所有outbound最终的顺序由文件名顺序和在文件内的顺序共同决定。
以上配置中,最终有3个outbound 顺序如下:
* o_a_1
* o_a_1
* o_b_2
* o_b_1
### inbound的配置
**selector**
## inbound的配置
```yaml
inbound:
- name: 'spider_log'
disable: False
weight: 10
selector:
- "submitter == 'taskhub' and (company_name or credit_no or company_code) and int(task_src) in [1, 11, 5, 3, 22, 21, 7]"
# AH 的老工商URL无法访问
- "task_type == '找新' and detail_url and province != 'AH' and task_src == 0"
# JS TL 找新无URL,但可以走老工商破码上线
- "task_type == '找新' and province in ['JS', 'TJ', 'ZJ'] and task_result == 1000 and task_src == 0"
- "task_type == '小微企业补充行业'"
class: 'kafka.KafkaInbound'
init:
consumer_settings:
bootstrap.servers: '{kafka_servers}'
group.id: 'taskhub-project-ic'
topics:
- 'collie-ic-spider-business-log'
convert:
- class: 'logstash.RemoveBuildinKeys'
- class: 'ic.ExtractTaskParams'
- class: 'ic.CaptchaExtraItem'
- class: 'ic.ChangeSearchKey'
- class: 'ic.ParseSearchinfoItems'
- class: 'ic.ParseNoticeItem'
threads: 8
```
### selector
筛选条件列表。输入任务只要满足列表中任一筛选条件就被接收进入任务路由。
**class和init**
### class和init
指定inbound的类型及其初始化的参数
```yaml
......@@ -177,19 +236,18 @@ outbound:
bootstrap.servers: '{kafka_servers}'
# 消费者组ID
group.id: 'taskhub-project-ic'
#爬虫日志的主题列表
topics:
- 'collie-ic-spider-business-log'
```
注意!
对于一个新的taskhub项目,使用KafkaInbound需要明确爬虫日志归集的topic,
并为taskhub分配新的consumer group。
对于一个新的taskhub项目,使用KafkaInbound需要明确爬虫日志归集的topic, 并为taskhub分配新的consumer group。
### convert
**convert**
对输入任务进行一些处理。
inbound接收到任务后,先进行转换操作再进行匹配。
如果匹配则转换后的任务进入后续路由。
对输入任务进行一些处理。 inbound接收到任务后,先进行转换操作再进行匹配。 如果匹配则转换后的任务进入后续路由。
|convert|功能|
|---|---|
......@@ -197,40 +255,197 @@ inbound接收到任务后,先进行转换操作再进行匹配。
|common.ExtractTaskParams|将task_params中的内容提出来放在任务数据中。|
|common.ExtractMetadata|将metadata中的内容提出来放在任务数据中。|
**threads**
执行inbound的线程数。
如果是KafkaInbound,该线程数不应该大于topic的分区数。
### threads
执行inbound的线程数。 如果是KafkaInbound,该线程数不应该大于topic的分区数。
## outbound的配置
定义了符合特定条件的任务的分发方式。
```yaml
inbound:
- name: 'spider_log'
disable: False
weight: 10
outbound:
# 新成立及变更企业上线,指定更新维度
- name: 'zongju_captcha_ts3'
# disable: True
selector:
- "submitter == 'taskhub' and (company_name or credit_no or company_code) and int(task_src) in [1, 11, 5, 3, 22, 21, 7]"
# AH 的老工商URL无法访问
- "task_type == '找新' and detail_url and province != 'AH' and task_src == 0"
# JS TL 找新无URL,但可以走老工商破码上线
- "task_type == '找新' and province in ['JS', 'TJ', 'ZJ'] and task_result == 1000 and task_src == 0"
- "task_type == '小微企业补充行业'"
class: 'kafka.KafkaInbound'
- "task_src in [3] and not detail_url and data_type"
max_lag: -1
retry_limits: 5
token_scope: "*"
class: 'redis_queue.RedisListOutbound'
init:
consumer_settings:
bootstrap.servers: '{kafka_servers}'
group.id: 'taskhub-project-ic'
url: '{redis_url}'
key: "captcha_spider"
task_params:
- company_name
- company_code
- company_key
- credit_no
- search_key
- province
- outbound
- company_name_digest
- search_id
- change_id
- data_type
task_src: 3
task_type: 破码
priority: 1000
rt_priority: 2000
```
### RedisListOutbound的参数
|参数 | 涵义 |
| --- | ---|
|name| 显示名称|
|selector| 匹配规则列表 |
|url| redis连接串 redis://... |
|key| 任务队列的KEY |
|task_params| 任务参数字段列表 |
|task_params_wrapped| 是否将任务参数包装在task_params键之内|
|retry_limits|重试次数上限|
|priority| 普通任务默认优先级|
|rt_priority| 实时任务优先级|
|dont_retry_status| |
|direct_failback_status| |
|failback| |
|token_scope| |
|token_per_second| |
|reset_retry_times||
|max_lag| 任务队列的最大积压 |
### task_params
从流转的任务数据中,提取指定的键作为任务参数。
除了在配置中指定的键,taskHUB也会附加一些内置键在任务参数中。
**注意**:爬虫项目在定义任务参数时要避免使用以下关键字作为参数名.
|内置参数 | 涵义 |
| --- | ---|
|outbound| outbound的名称|
|routed_count| 任务被路由的次数 |
|retry_times|任务重试的次数|
|submitter| 固定为taskhub|
|submit_time| 任务提交的时间|
|retry_limits| 任务重试上限|
|rt| 是否是实时任务|
|priority| 任务的优先级 |
|task_uuid| 任务的UUID,由taskhub生成|
|group_retry_times| 任务组重试次数|
|token_scope| |
#### task_params的限制
目前taskHUB只从任务数据的顶级键中提取任务参数。
所以需要保证任务数据流转到outbound时,task_params所需的键都存在于任务数据的顶层。
ExtractTaskParams, ExtractMetadata这些convert就是为了保证这一点而设计的。
### task_params_wrapped
```json
# task_params_wrapped=True
{
"task_params": {
"company_name": "浙江温州龙湾农村商业银行股份有限公司",
"credit_no": "913303007772078015",
"search_key": "913303007772078015",
"data_type": "employee",
"rt": true
}
}
#task_params_wrapped=False
{
"company_name": "浙江温州龙湾农村商业银行股份有限公司",
"credit_no": "913303007772078015",
"search_key": "913303007772078015",
"data_type": "employee",
"rt": true
}
```
## 任务的处理
### 新任务
根据路由规则进行匹配,如匹配则分发。如不匹配则失败结束。
### 成功的任务
由主配置文件中的参数 terminal_codes。定义哪些任务可以(成功)终结。
### 失败任务
一个失败任务有以下几种可能的结果
1. 满足重试条件,在原outbound进行重试
2. 指定了后备outbound,在后备outbound上重试
3. 处于不用重试的状态,尝试使用路由规则进行匹配
4. 不满足上述条件,以失败结束
### dont_retry_status
有一些错误状态的任务无需进行重试。如, 任务中的url错误导致无法正常访问。重试也没有意义。
处于这些状态的任务并没有成功, 重试也没有机会成功。所以不需要再重试.
在路由规则中需要设计相应的规则让处于这类状态的任务能够正确的路由到其他可处理的outbound上
### direct_failback_status
哪些任务状态直接分发到failback所指定的outbound上。
## 常用接口
### 任务提交接口
> http://<host>:<port>/task/
```http request
POST http://10.8.6.222:8517/task/
Content-Type: application/json
{
"company_name": "浙江温州龙湾农村商业银行股份有限公司",
"credit_no": "913303007772078015",
"search_key": "913303007772078015",
"data_type": "employee",
"rt": true
}
topics:
- 'collie-ic-spider-business-log'
convert:
- class: 'logstash.RemoveBuildinKeys'
- class: 'ic.ExtractTaskParams'
- class: 'ic.CaptchaExtraItem'
- class: 'ic.ChangeSearchKey'
- class: 'ic.ParseSearchinfoItems'
- class: 'ic.ParseNoticeItem'
threads: 8
```
### 任务分发测试接口
> http://<host>:<port>/inbound/<inbound名称>/check_task/
```http request
POST http://10.8.6.222:8526/inbound/gravel_inbound/check_task/
Content-Type: application/json
{
"task_result": 1001,
"spider_start_time": "2021-10-22 15:12:02.145",
"type": "tax_arrears_announcement",
"spider_end_time": "2021-10-22 15:12:03",
"spider_ip": "10.8.6.51",
"metadata": {
"period": "2020年第2期"
},
"http_code": 200,
"log_date": "2021.10.22",
"spider_name": "tax_arrears_announcement",
"data_type": "task_distribution",
"error_msg": "",
"task_params": {
"city": "guangxi",
"operate": "all",
"task_type": "task"
}
}
```
### outbound查看接口
> http://<host>:<port>/outbound/<outbound_prefix>
```http request
GET http://10.8.6.222:8526/outbound/
Content-Type: application/json
```
### 测试接口
\ No newline at end of file
## 测试接口
\ No newline at end of file
Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages