Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Knowledge_share
  • 如何创建需要遍历公司的数据任务

Last edited by 吴一博 Jan 06, 2022
Page history

如何创建需要遍历公司的数据任务

背景

在日常的工作中,经常需要以符合特定条件的公司列表作为数据任务的输入。例如,某个爬虫任务需要采集所有在营企业的数据,要以企业的统一信用代码作为搜索条件。因此我们需要一种方法,它可以方便获取所需数据,并容易的将其作为爬虫任务提交到爬虫队列。本文介绍了使用data_pump结合ES来达到此目的的方法。

阅读本文前所需了解知识

  • data_pump的基本使用
  • elasticsearch的基本使用
  • taskhub的基本使用

方法说明

本方法使用data_pump从ES的company索引中查询相关数据。并将任务提交到taskhub接口或爬虫的redis队列。

EsDocReader的配置

  es_company:
    class: es.EsDocReader
    init:
      hosts:
        - host: es-cn-4591blu580004eavf.elasticsearch.aliyuncs.com
          port: 9200
          http_auth: [ 'elastic', 'XXXXX' ]
      index: company
      dsl:
        _source: [
            "company_name",
            "credit_no",
            "company_code",
            "company_name_digest"
        ]
        "query": {
          "bool": {
            "must": [
              { "match_phrase": { "n_company_status": { "query": "正常" } } },
              { "match_phrase": { "company_major_type": { "query": 3 } } },
              { "range": { "updated": { "lt": "{end_date}" } } }
            ]
          }
        }

      offset:
        store: "{offset_store_path}"
        field: company_name_digest.keyword
        reset_hours: 0

      limit: 5000
      timeout: 600
参数 注释
dsl.query 数据筛选条件。如果不会写es查询语句,可以在kibana的discover中筛选,然后在inspect->request中拷贝条件
dsl._source 返回数据的字段列表
offset.store offset 存储路径。形式如:file:///path/to/offset
offset.field 充当offset的字段。在遍历公司这个场景下固定使用 company_name_digest.keyword
offset.reset_hours 重置offset的时间周期。小时数。
limit 最大记录数
timeout 查询ES的超时。单位为秒

关于 dsl.query

该参数用于指定筛选数据所需的条件。 如果不会写es查询语句,可以在kibana的discover中筛选,然后在inspect->request中拷贝条件直接放在配置中使用。

关于 reset_hours

如果任务是周期性重复执行。则需要在遍历完所有数据后将offset重置。就需要依靠此参数来开启。重置offset有两个必要条件:1. 当前offset值之后无数据可以消费 2. 上一次重置到当前时间差大于等于reset_hours值 当两个条件同时满足时offset会被重置,同时记录重置时间。

company索引字段说明

以下为一些常用的用于数据过滤的字段,完整说明参见钉钉群文件:基础架构组/03_数据资产/Comapny字段说明.xlsx

字段名 注释
company_name_digest company_name_digest
company_status 公司状态
n_company_status 简化后公司状态(正常、注销、吊销、其他)
company_type 公司类型
company_major_type 公司类型大类:1 个体 2 合作社 3 企业
company_minor_type 企业登记类型代码,具体见《企业登记注册类型对照表》
capital 注册资本(元)
credit_no 统一信用代码
org_code 组织机构代码
establish_date 成立日期
industry_l1_code 行业分类
industry_l2_code 行业分类
industry_l3_code 行业分类
industry_l4_code 行业分类
province_short 省缩写
history_name 历史名称
business_scope 经营范围
division_code 登记地行政区划代码
employees 主要人员名称
partners 股东名称
risk_tags 风险标签 具体见 sheet:risk_tags
tags 标签 具体见 sheet:tags
company_scale_type 最新一年年报--大中小微型分类
annual_report_years 提交年报的年份集合
no_ar_submitted 没有提交年报的年份集合
spider_update_time 工商爬虫最后一次爬取的时间

TaskhubWriter配置

  taskhub:
    class: taskhub.TaskhubWriter
    init:
      host: "10.8.6.222"
      port: 8518
      feed_on_demand: true
      outbound: zongju_captcha
      task_params:
        - company_name_digest
        - company_code
        - company_name
        - credit_no
        - search_key
      task_src: 1
      priority: 200
      rt: true
参数 注释
host taskhub服务地址
port taskhub服务端口
feed_on_demand 是否按需提交。如果设为true, 则要根据outbound所指定的任务队列最大任务量与积压情况来决定是否需要提交新的任务
outbound 用于检查积压的 outbound
task_params 任务参数字段列表
附加参数 自定义的任务参数。用于附加一些固定的任务参数在实际提交的任务中。如指定优先级。

关于按需提交

一些情况下,不希望任务队列中积压过多任务。则需要打开feed_on_demand来控制任务提交。当writer开启该选项后,data_pump在从reader消费数据前,会读取writer的需求量,然后从reader消费相应数量的数据。目前 redis和taskhub 两个writer支持该功能。

Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages