Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Data_pump
  • quick_start

Last edited by 吴一博 Nov 23, 2021
Page history

quick_start

DataPump应用指南

基本概念

RFW 结构

输入(reader) -> 处理(filter) -> 输出(writer)

IPO (input - process - output) 模型

流转的数据结构

单条

批量

配置

变量

"{i_am_variable}"

内置的变量

  • 'user'
  • 'home'
  • 'today'
  • 'yesterday'
  • 'one_hour_ago'
  • 'last_month'
  • 'last_year'
  • 'X_days_ago'
  • 'X_hours_ago'

自定义变量

#变量定义
vars:
  servers: 10.8.6.231:9092,10.8.6.230:9092,10.8.6.232:9092

#变量引用
kafka: "{servers}"

利用变量实现脚本的参数化

#config.yaml
#在配置文件中定义变量
vars:
  path: "/path/to/test"

使用 assign_attrs 命令行参数改变变量的值

data_pump.py pump_data -c config.yaml --assign_attrs path "/path/to/other"

配置方案

应用场景

数据导入导出

数据清洗

根据特定样本提取数据

基本思路: 样本从reader来,通过filter提数。

涉及到的filter

  • sql.SqlFilter, 从mysql中提数

  • es.EsQueryFilter, 从ES中提数

  • http.Http, 调用接口提数

  • sql.SqlFilter

filter_query_query_contact_record:
    class: sql.SqlFilter
    init:
      db:
        user: bdp_company
        password: "**********"
        host: bdp-rds-001.mysql.rds.aliyuncs.com
        port: 3306
        database: bdp_company
      sql: "select distinct contact, tag from company_contact_details where company_name_digest = %(company_name_digest)s and contact_type = 'T';"
      result_name: record
      params:
        company_name_digest: __IN_DATA__
  • es.EsQueryFilter
  filter_query_es:
    class: es.EsQueryFilter
    init:
      hosts:
        - host: es-cn-4591blu580004eavf.elasticsearch.aliyuncs.com
          port: 9200
          http_auth: [ 'elastic', 'utn@0818' ]
          timeout: 30
      index: company
      dsl: {
        "query": {
          "bool": {
            "must": [
              { "match_all": { } },
              {
                "match_phrase": {
                  "company_name.keyword": {
                    "query": "__IN_DATA__$.company_name"
                  }
                }
              }
            ]
          }
        }
      }
  • http.Http

驱动非scrapy爬虫

基本思路: 使用redis reader作为爬虫任务队列。使用爬虫实现为UDM模块。使用kafka writer保存结果

  pump:
    reader:
      - redis_reader
    filter:
      - spider_tmall
      - parser_tmall
      - extract_data
    writer:
      - tb_consumer_goods
      - kafka_result

想实现的功能data_pump还没有,怎么办?

试试eval

  trans_fields:
    class: field.Eval
    init:
      pc_daily_pv: "baidu_planner['pc_pv']//30"
      mobile_daily_pv: "baidu_planner['mobile_pv']//30"
      collect_date: "spider_time[:10]"

想对数据转换一下结构

试试jsonpath

  ad_word_performance_fields:
    class: field.JsonPathExtract
    init:
      word: "word"
      label: "baidu_planner.show_reason"
      competition: "baidu_planner.all_competition"
      pc_pv: "pc_daily_pv"
      mobile_pv: "mobile_daily_pv"
      bid: "baidu_planner.all_bid"
      week_index: "trends_attention.week_index"
      collect_date: "collect_date"

爬虫任务提交

如何创建需要遍历公司的数据任务

Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages