|
|
#DataPump应用指南
|
|
|
|
|
|
## 基本概念
|
|
|
### RFW 结构
|
|
|
输入(reader) -> 处理(filter) -> 输出(writer)
|
|
|
IPO (input - process - output) 模型
|
|
|
|
|
|
|
|
|
```plantuml
|
|
|
@startuml
|
|
|
rectangle reader1
|
|
|
rectangle reader2
|
|
|
rectangle reader3
|
|
|
rectangle filter1
|
|
|
rectangle filter2
|
|
|
rectangle filter3
|
|
|
rectangle writer1
|
|
|
rectangle writer2
|
|
|
rectangle writer3
|
|
|
|
|
|
|
|
|
reader1 --> filter1
|
|
|
reader2 --> filter1
|
|
|
reader3 --> filter1
|
|
|
filter1 --> filter2
|
|
|
filter2 --> filter3
|
|
|
filter3 --> writer1
|
|
|
filter3 --> writer2
|
|
|
filter3 --> writer3
|
|
|
@enduml
|
|
|
```
|
|
|
|
|
|
### 流转的数据结构
|
|
|
单条
|
|
|
批量
|
|
|
|
|
|
## 配置
|
|
|
### 变量
|
|
|
```
|
|
|
"{i_am_variable}"
|
|
|
```
|
|
|
|
|
|
#### 内置的变量
|
|
|
* 'user'
|
|
|
* 'home'
|
|
|
* 'today'
|
|
|
* 'yesterday'
|
|
|
* 'one_hour_ago'
|
|
|
* 'last_month'
|
|
|
* 'last_year'
|
|
|
* 'X_days_ago'
|
|
|
* 'X_hours_ago'
|
|
|
|
|
|
#### 自定义变量
|
|
|
|
|
|
```yaml
|
|
|
#变量定义
|
|
|
vars:
|
|
|
servers: 10.8.6.231:9092,10.8.6.230:9092,10.8.6.232:9092
|
|
|
|
|
|
#变量引用
|
|
|
kafka: "{servers}"
|
|
|
```
|
|
|
|
|
|
#### 利用变量实现脚本的参数化
|
|
|
```yaml
|
|
|
#config.yaml
|
|
|
#在配置文件中定义变量
|
|
|
vars:
|
|
|
path: "/path/to/test"
|
|
|
|
|
|
```
|
|
|
|
|
|
使用 assign_attrs 命令行参数改变变量的值
|
|
|
|
|
|
```shell
|
|
|
data_pump.py pump_data -c config.yaml --assign_attrs path "/path/to/other"
|
|
|
```
|
|
|
|
|
|
### 配置方案
|
|
|
|
|
|
## 应用场景
|
|
|
### 数据导入导出
|
|
|
### 数据清洗
|
|
|
### 根据特定样本提取数据
|
|
|
基本思路: 样本从reader来,通过filter提数。
|
|
|
|
|
|
涉及到的filter
|
|
|
|
|
|
* sql.SqlFilter, 从mysql中提数
|
|
|
* es.EsQueryFilter, 从ES中提数
|
|
|
* http.Http, 调用接口提数
|
|
|
|
|
|
* sql.SqlFilter
|
|
|
```yaml
|
|
|
filter_query_query_contact_record:
|
|
|
class: sql.SqlFilter
|
|
|
init:
|
|
|
db:
|
|
|
user: bdp_company
|
|
|
password: "**********"
|
|
|
host: bdp-rds-001.mysql.rds.aliyuncs.com
|
|
|
port: 3306
|
|
|
database: bdp_company
|
|
|
sql: "select distinct contact, tag from company_contact_details where company_name_digest = %(company_name_digest)s and contact_type = 'T';"
|
|
|
result_name: record
|
|
|
params:
|
|
|
company_name_digest: __IN_DATA__
|
|
|
```
|
|
|
|
|
|
* es.EsQueryFilter
|
|
|
```yaml
|
|
|
filter_query_es:
|
|
|
class: es.EsQueryFilter
|
|
|
init:
|
|
|
hosts:
|
|
|
- host: es-cn-4591blu580004eavf.elasticsearch.aliyuncs.com
|
|
|
port: 9200
|
|
|
http_auth: [ 'elastic', 'utn@0818' ]
|
|
|
timeout: 30
|
|
|
index: company
|
|
|
dsl: {
|
|
|
"query": {
|
|
|
"bool": {
|
|
|
"must": [
|
|
|
{ "match_all": { } },
|
|
|
{
|
|
|
"match_phrase": {
|
|
|
"company_name.keyword": {
|
|
|
"query": "__IN_DATA__$.company_name"
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
* http.Http
|
|
|
|
|
|
### 驱动非scrapy爬虫
|
|
|
基本思路: 使用redis reader作为爬虫任务队列。使用爬虫实现为UDM模块。使用kafka writer保存结果
|
|
|
|
|
|
|
|
|
```yaml
|
|
|
pump:
|
|
|
reader:
|
|
|
- redis_reader
|
|
|
filter:
|
|
|
- spider_tmall
|
|
|
- parser_tmall
|
|
|
- extract_data
|
|
|
writer:
|
|
|
- tb_consumer_goods
|
|
|
- kafka_result
|
|
|
```
|
|
|
|
|
|
### 爬虫任务提交
|
|
|
|