Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Knowledge_share
  • taskHUB 介绍

Last edited by 吴一博 Nov 03, 2021
Page history

taskHUB 介绍

分享的目的

  • 目前工作中已经大量的在使用,但大家对他没有系统性的认识
  • 通过分享可以让大家全面了解taskHUB的特性

taskHUB 介绍

爬虫任务的状态

taskHUB 的作用

  1. 接受任务。
  2. 分发任务。
  3. 派生任务。
  4. 失败重试。

接受任务

以HTTP POST方式提交任务

POST http://10.8.6.222:8518/task/
Content-Type: application/json

{
  "company_name": "宁乡县老粮仓镇楚江冻库",
  "credit_no": "92430124MA4MGMN16U",
  "search_key": "92430124MA4MGMN16U",
  "data_type": "change",
  "rt": true
}

分发任务

基于规则,将任务提交到对应的任务队列(redis)

派生任务

根据日志,及创建任务的规则

失败重试

针对日志中特定任务状态(task_result)的任务,将其再次提交到任务队列中,以实现对失败任务的重试。

taskHUB 的配置

基本概念

  • 任务 一个任务,在taskHUB中,是以dict形式流转的一条数据。在taskHUB以外(redis、kafka、日志文件)任务均以json形式存储。
  • inbound 定义任务的接收行为。
  • outbound 定义任务的分发行为。
  • 路由 一个任务进入taskHUB后,按outbound的顺序,依次检查任务的状态符合哪个outbound所定义的规则。任务由第一个符合规则的outbound分发。

配置文件结构

#子配置文件引用
include:
  - path/to/config

#项目标识
project: "name_of_project"

#内置的http服务绑定的地址和端口
server:
  bind: '0.0.0.0'
  port: 8526

#路由基本设置
routing:
  #任务最大的路由次数
  limits: -1
  #任务终结状态码
  terminal_codes:
    - 1000
    - 1101
  #失败任务记录
  failure:
    path: '{failure_path}'
    compress: False

#任务输入设置
inbound:
#任务分发设置
outbound:
#日志设置
logging:

配置分离(include)

使用include可以将配置文件拆分成若干相对独立的部分。 在子配置文件中,可以定义inbound和outbound。

目录结构

├── main.yaml
└── config.d
    ├── sub_config_a.yaml
    └── sub_config_b.yaml

配置文件

#main.yaml
include:
  - config.d

#sub_config_a.yaml
#定义一个outbound
outbound:
  - name: o_a_1
    selector: ...

#sub_config_b.yaml
#定义两个outbound
outbound:
  - name: o_b_2
    selector: ...

  - name: o_b_1
    selector: ...

inbound/outbound 顺序

最终的结果是将所有子配置文件和主配置文件中的inbound和outbound合并在一起。 合并过程是依子配置文件文件名按子母排序升序依次进行。因此,所有outbound最终的顺序由文件名顺序和在文件内的顺序共同决定。

以上配置中,最终有3个outbound 顺序如下:

  • o_a_1
  • o_b_2
  • o_b_1

inbound的配置

inbound:
  - name: 'spider_log'
    disable: False
    weight: 10
    selector:
      - "submitter == 'taskhub' and (company_name or credit_no or company_code) and int(task_src) in [1, 11, 5, 3, 22, 21, 7]"
      # AH 的老工商URL无法访问
      - "task_type == '找新' and detail_url and province != 'AH' and task_src == 0"
      # JS TL 找新无URL,但可以走老工商破码上线
      - "task_type == '找新' and province in ['JS', 'TJ', 'ZJ'] and task_result == 1000 and task_src == 0"
      - "task_type == '小微企业补充行业'"
    class: 'kafka.KafkaInbound'
    init:
      consumer_settings:
        bootstrap.servers: '{kafka_servers}'
        group.id: 'taskhub-project-ic'

      topics:
        - 'collie-ic-spider-business-log'
    convert:
      - class: 'logstash.RemoveBuildinKeys'
      - class: 'ic.ExtractTaskParams'
      - class: 'ic.CaptchaExtraItem'
      - class: 'ic.ChangeSearchKey'
      - class: 'ic.ParseSearchinfoItems'
      - class: 'ic.ParseNoticeItem'
    threads: 8

selector

筛选条件列表。输入任务只要满足列表中任一筛选条件就被接收进入任务路由。

class和init

指定inbound的类型及其初始化的参数

    # kafka

    class: 'kafka.KafkaInbound'
    init:
      consumer_settings:
        # kafka服务地址
        bootstrap.servers: '{kafka_servers}'
        # 消费者组ID
        group.id: 'taskhub-project-ic'

      #爬虫日志的主题列表
      topics:
        - 'collie-ic-spider-business-log'

注意! 对于一个新的taskhub项目,使用KafkaInbound需要明确爬虫日志归集的topic, 并为taskhub分配新的consumer group。

convert

对输入任务进行一些处理。 inbound接收到任务后,先进行转换操作再进行匹配。 如果匹配则转换后的任务进入后续路由。

convert 功能
logstash.RemoveBuildinKeys 删除logstash加入的一些内置key
common.ExtractTaskParams 将task_params中的内容提出来放在任务数据中。
common.ExtractMetadata 将metadata中的内容提出来放在任务数据中。

threads

执行inbound的线程数。 如果是KafkaInbound,该线程数不应该大于topic的分区数。

outbound的配置

定义了符合特定条件的任务的分发方式。

outbound:
  # 新成立及变更企业上线,指定更新维度
  - name: 'zongju_captcha_ts3'
    #    disable: True
    selector:
      - "task_src in [3] and not detail_url and data_type and task_parms['province'] == 'GD'"
    max_lag: -1
    retry_limits: 5
    token_scope: "*"
    class: 'redis_queue.RedisListOutbound'
    init:
      url: '{redis_url}'
      key: "captcha_spider"
      task_params:
        - company_name
        - company_code
        - company_key
        - credit_no
        - search_key
        - province
        - outbound
        - company_name_digest
        - search_id
        - change_id
        - data_type
      task_src: 3
      task_type: 破码
      priority: 1000
      rt_priority: 2000

RedisListOutbound的参数

参数 涵义
name 显示名称
selector 匹配规则列表
url redis连接串 redis://...
key 任务队列的KEY
task_params 任务参数字段列表
task_params_wrapped 是否将任务参数包装在task_params键之内
retry_limits 重试次数上限
priority 普通任务默认优先级
rt_priority 实时任务优先级
dont_retry_status
direct_failback_status
failback
token_scope
token_per_second
reset_retry_times
max_lag 任务队列的最大积压

task_params

从流转的任务数据中,提取指定的键作为任务参数。

除了在配置中指定的键,taskHUB也会附加一些内置键在任务参数中。 注意:爬虫项目在定义任务参数时要避免使用以下关键字作为参数名.

内置参数 涵义
outbound outbound的名称
routed_count 任务被路由的次数
retry_times 任务重试的次数
submitter 固定为taskhub
submit_time 任务提交的时间
retry_limits 任务重试上限
rt 是否是实时任务
priority 任务的优先级
task_uuid 任务的UUID,由taskhub生成
group_retry_times 任务组重试次数
token_scope

task_params的限制

目前taskHUB只从任务数据的顶级键中提取任务参数。 所以需要保证任务数据流转到outbound时,task_params所需的键都存在于任务数据的顶层。 ExtractTaskParams, ExtractMetadata这些convert就是为了保证这一点而设计的。

task_params_wrapped

# task_params_wrapped=True

{
  "task_params": {
    "company_name": "浙江温州龙湾农村商业银行股份有限公司",
    "credit_no": "913303007772078015",
    "search_key": "913303007772078015",
    "data_type": "employee",
    "rt": true
  }
}

# task_params_wrapped=False
        
{
  "company_name": "浙江温州龙湾农村商业银行股份有限公司",
  "credit_no": "913303007772078015",
  "search_key": "913303007772078015",
  "data_type": "employee",
  "rt": true
}

任务的处理

新任务

根据路由规则进行匹配,如匹配则分发。如不匹配则失败结束。

成功的任务

由主配置文件中的参数 terminal_codes。定义哪些任务可以(成功)终结。

失败任务

一个失败任务有以下几种可能的结果

  1. 满足重试条件,在原outbound进行重试
  2. 指定了后备outbound,在后备outbound上重试
  3. 处于不用重试的状态,尝试使用路由规则进行匹配
  4. 不满足上述条件,以失败结束

dont_retry_status

有一些错误状态的任务无需进行重试。如, 任务中的url错误导致无法正常访问。重试也没有意义。 处于这些状态的任务并没有成功, 重试也没有机会成功。所以不需要再重试. 在路由规则中需要设计相应的规则让处于这类状态的任务能够正确的路由到其他可处理的outbound上

direct_failback_status

哪些任务状态直接分发到failback所指定的outbound上。

常用接口

任务提交接口

http://<host>:<port>/task/
POST http://10.8.6.222:8517/task/
Content-Type: application/json

{
"company_name": "浙江温州龙湾农村商业银行股份有限公司",
"credit_no": "913303007772078015",
"search_key": "913303007772078015",
"data_type": "employee",
"rt": true
}

任务分发测试接口

 http://<host>:<port>/inbound/<inbound名称>/check_task/
POST http://10.8.6.222:8526/inbound/gravel_inbound/check_task/
Content-Type: application/json

{
    "task_result": 1001,
    "spider_start_time": "2021-10-22 15:12:02.145",
    "type": "tax_arrears_announcement",
    "spider_end_time": "2021-10-22 15:12:03",
    "spider_ip": "10.8.6.51",
    "metadata": {
      "period": "2020年第2期"
    },
    "http_code": 200,
    "log_date": "2021.10.22",
    "spider_name": "tax_arrears_announcement",
    "data_type": "task_distribution",
    "error_msg": "",
    "task_params": {
      "city": "guangxi",
      "operate": "all",
      "task_type": "task"
    }
}

outbound查看接口

 http://<host>:<port>/outbound/<outbound_prefix>
GET http://10.8.6.222:8526/outbound/
Content-Type: application/json

测试接口

Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages