#DataPump应用指南
基本概念
RFW 结构
输入(reader) -> 处理(filter) -> 输出(writer) IPO (input - process - output) 模型
流转的数据结构
单条 批量
配置
变量
"{i_am_variable}"
内置的变量
- 'user'
- 'home'
- 'today'
- 'yesterday'
- 'one_hour_ago'
- 'last_month'
- 'last_year'
- 'X_days_ago'
- 'X_hours_ago'
自定义变量
#变量定义
vars:
servers: 10.8.6.231:9092,10.8.6.230:9092,10.8.6.232:9092
#变量引用
kafka: "{servers}"
利用变量实现脚本的参数化
#config.yaml
#在配置文件中定义变量
vars:
path: "/path/to/test"
使用 assign_attrs 命令行参数改变变量的值
data_pump.py pump_data -c config.yaml --assign_attrs path "/path/to/other"
配置方案
应用场景
数据导入导出
数据清洗
根据特定样本提取数据
基本思路: 样本从reader来,通过filter提数。
涉及到的filter
-
sql.SqlFilter, 从mysql中提数
-
es.EsQueryFilter, 从ES中提数
-
http.Http, 调用接口提数
-
sql.SqlFilter
filter_query_query_contact_record:
class: sql.SqlFilter
init:
db:
user: bdp_company
password: "**********"
host: bdp-rds-001.mysql.rds.aliyuncs.com
port: 3306
database: bdp_company
sql: "select distinct contact, tag from company_contact_details where company_name_digest = %(company_name_digest)s and contact_type = 'T';"
result_name: record
params:
company_name_digest: __IN_DATA__
- es.EsQueryFilter
filter_query_es:
class: es.EsQueryFilter
init:
hosts:
- host: es-cn-4591blu580004eavf.elasticsearch.aliyuncs.com
port: 9200
http_auth: [ 'elastic', 'utn@0818' ]
timeout: 30
index: company
dsl: {
"query": {
"bool": {
"must": [
{ "match_all": { } },
{
"match_phrase": {
"company_name.keyword": {
"query": "__IN_DATA__$.company_name"
}
}
}
]
}
}
}
- http.Http
驱动非scrapy爬虫
基本思路: 使用redis reader作为爬虫任务队列。使用爬虫实现为UDM模块。使用kafka writer保存结果
pump:
reader:
- redis_reader
filter:
- spider_tmall
- parser_tmall
- extract_data
writer:
- tb_consumer_goods
- kafka_result