Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Knowledge_share
  • taskHUB 介绍

Last edited by 吴一博 Nov 03, 2021
Page history
This is an old version of this page. You can view the most recent version or browse the history.

taskHUB 介绍

taskHUB 介绍

分享的目的

  • 目前工作中已经大量的在使用,但大家对他没有系统性的认识
  • 通过分享可以让

taskHUB 介绍

爬虫任务的状态。

taskHUB 的作用

  1. 接受任务。
  2. 分发任务。
  3. 派生任务。
  4. 失败重试。

接受任务

以HTTP POST方式提交任务

POST http://10.8.6.222:8518/task/
Content-Type: application/json

{
  "company_name": "宁乡县老粮仓镇楚江冻库",
  "credit_no": "92430124MA4MGMN16U",
  "search_key": "92430124MA4MGMN16U",
  "data_type": "change",
  "rt": true
}

分发任务

基于规则,将任务提交到对应的任务队列(redis)

派生任务

根据日志,及创建任务的规则

失败重试

taskHUB 的配置

基本概念

  • inbound 定义任务的接收行为。
  • outbound 定义任务的分发行为。
  • 路由 一个任务进入taskHUB后,按outbound的顺序,依次检查任务的状态符合哪个outbound所定义的规则。任务由第一个符合规则的outbound分发。

配置文件结构

#子配置文件引用
include:
  - path/to/config
   
#项目标识
project: "name_of_project"

#内置的http服务绑定的地址和端口
server:
  bind: '0.0.0.0'
  port: 8526

#路由基本设置
routing:
  #任务最大的路由次数
  limits: -1
  #任务终结状态码
  terminal_codes:
    - 1000
    - 1101
  #失败任务记录
  failure:
    path: '{failure_path}'
    compress: False

#任务输入设置
inbound:
#任务分发设置
outbound:
#日志设置
logging:

配置分离(include)

使用include可以将配置文件拆分成若干相对独立的部分。 在子配置文件中,可以定义inbound和outbound。最终的结果是将所有子配置文件和主配置文件中的inbound和outbound合并在一起。 合并过程是依子配置文件文件名按子母排序升序依次进行。因此,所有outbound最终的顺序由文件名顺序和在文件内的顺序共同决定。

├── main.yaml
└── config.d
    ├── sub_config_a.yaml
    └── sub_config_b.yaml
#main.yaml
include:
  - config.d
  
#sub_config_a.yaml
#定义一个outbound
outbound:
  - name: o_a_1
    selector: ...

#sub_config_b.yaml
#定义两个outbound
outbound:
  - name: o_b_2
    selector: ...

  - name: o_b_1
    selector: ...

以上配置中,最终有3个outbound 顺序如下:

  • o_a_1
  • o_b_2
  • o_b_1

inbound的配置

selector 筛选条件列表。输入任务只要满足列表中任一筛选条件就被接收进入任务路由。

class和init 指定inbound的类型及其初始化的参数

    # kafka

    class: 'kafka.KafkaInbound'
    init:
      consumer_settings:
        # kafka服务地址
        bootstrap.servers: '{kafka_servers}'
        # 消费者组ID
        group.id: 'taskhub-project-ic'
        
      #爬虫日志的主题列表
      topics:
        - 'collie-ic-spider-business-log'

注意! 对于一个新的taskhub项目,使用KafkaInbound需要明确爬虫日志归集的topic, 并为taskhub分配新的consumer group。

convert 对输入任务进行一些处理。 inbound接收到任务后,先进行转换操作再进行匹配。 如果匹配则转换后的任务进入后续路由。

convert 功能
logstash.RemoveBuildinKeys 删除logstash加入的一些内置key
common.ExtractTaskParams 将task_params中的内容提出来放在任务数据中。
common.ExtractMetadata 将metadata中的内容提出来放在任务数据中。

threads 执行inbound的线程数。 如果是KafkaInbound,该线程数不应该大于topic的分区数。

inbound:
  - name: 'spider_log'
    disable: False
    weight: 10
    selector:
      - "submitter == 'taskhub' and (company_name or credit_no or company_code) and int(task_src) in [1, 11, 5, 3, 22, 21, 7]"
      # AH 的老工商URL无法访问
      - "task_type == '找新' and detail_url and province != 'AH' and task_src == 0"
      # JS TL 找新无URL,但可以走老工商破码上线
      - "task_type == '找新' and province in ['JS', 'TJ', 'ZJ'] and task_result == 1000 and task_src == 0"
      - "task_type == '小微企业补充行业'"
    class: 'kafka.KafkaInbound'
    init:
      consumer_settings:
        bootstrap.servers: '{kafka_servers}'
        group.id: 'taskhub-project-ic'

      topics:
        - 'collie-ic-spider-business-log'
    convert:
      - class: 'logstash.RemoveBuildinKeys'
      - class: 'ic.ExtractTaskParams'
      - class: 'ic.CaptchaExtraItem'
      - class: 'ic.ChangeSearchKey'
      - class: 'ic.ParseSearchinfoItems'
      - class: 'ic.ParseNoticeItem'
    threads: 8

任务分发测试接口

测试接口

Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages