Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Best_practice
  • maxwell

maxwell · Changes

Page history
update: maxwell最佳实践 authored Sep 13, 2021 by 李子健's avatar 李子健
Hide whitespace changes
Inline Side-by-side
Showing with 768 additions and 0 deletions
+768 -0
  • best_practice/maxwell.md best_practice/maxwell.md +768 -0
  • No files found.
best_practice/maxwell.md 0 → 100644
View page @ f1114adf
## 前置条件
### 1、MySQL
```shell
MySQL需要开启binlog功能,binlog格式需为row,要设置一个唯一的server_id,binlog_row_image应该为FULL(默认就是FULL)
vi /etc/my.cnf #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL
GTID模式:
vi /etc/my.cnf #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL
gtid-mode=ON
log-slave-updates=ON
enforce-gtid-consistency=true
```
### 2、Maxwell库
```sql
Maxwell需要建一个数据库,用于存放状态信息,默认库名为maxwell,该库在maxwell启动时会自动创建,但是需要为其设置相应权限;maxwell运行时需要获取复制状态信息,以及在information_schema中获取数据库及字段信息,所以还需要授予相应权限。如果使用root用户,可自行创建名为maxwell的库。
CREATE USER 'maxwell'@'%' identified by 'XXXXXX';
GRANT ALL on maxwell.* to 'maxwell'@'%' ;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;
```
### 3、java
```
maxwell运行需要java
```
## 简单测试
### 1、安装
```shell
wget https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
tar zxvf maxwell-1.27.1.tar.gz
mv maxwell-1.27.1/ maxwell
```
### 2、配置
```shell
mkdir -p maxwell/conf
cd maxwell/conf
cp ../maxwell/config.properties.example ./prod.maxwell.properties
vim prod.maxwell.properties
常用配置:
config config.properties, 配置文件
log_level info,日志级别
daemon, 守护进程
schema_database maxwell, 数据库
client_id maxwell, maxwell实例名称
replica_server_id 6379, 类似于server_id,多实例
filter, 过滤器
output_binlog_position true, 记录binlog位点
output_ddl true, 记录ddl变更
配置优先级:
命令行 -> 环境变量 -> 配置文件 -> 默认值
```
prod.maxwell.properties(启动时指定的参数配置文件,也可直接maxwell命令后接各参数启动)
```properties
# tl;dr config
log_level=info
#producer=kafka
#kafka.bootstrap.servers=192.168.109.55:9092
#kafka_topic=prod.%{database}.%{table}
#kafka_topic=maxwell_test
# mysql login info
host=192.168.109.200
port=3801
user=root
password=root123
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=file
# set the log level. note that you can configure things further in log4j2.xml
# [DEBUG, INFO, WARN, ERROR]
log_level=INFO
# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
#env_config_prefix=/home/collie/lizj/maxwell/
# *** mysql ***
# mysql host to connect to
host=192.168.109.200
# mysql port to connect to
port=3801
# mysql user to connect as. This user must have REPLICATION SLAVE permissions,
# as well as full access to the `maxwell` (or schema_database) database
user=root
# mysql password
password=root123
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello
# name of the mysql database where maxwell keeps its own state
#schema_database=maxwell
# whether to use GTID or not for positioning
#gtid_mode=true
# SSL/TLS options
# To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts:
# -Djavax.net.ssl.trustStore=<truststore> -Djavax.net.ssl.trustStorePassword=<password>
# or import the MySQL cert into the global Java cacerts.
# MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY
#
# turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified
#ssl=DISABLED
# for binlog-connector
#replication_ssl=DISABLED
# for the schema-capture connection, if used
#schema_ssl=DISABLED
# maxwell can optionally replicate from a different server than where it stores
# schema and binlog position info. Specify that different server here:
# 如果指定了 replication_host,那么它是真正的binlog来源的mysql server地址
#replication_host=other
#replication_user=username
#replication_password=password
#replication_port=3306
# This may be useful when using MaxScale's binlog mirroring host.
# Specifies that Maxwell should capture schema from a different server than
# it replicates from:
# 从哪个host获取表结构
#schema_host=other
#schema_user=username
#schema_password=password
#schema_port=3306
# *** output format ***
# records include binlog position (default false)
#output_binlog_position=true
# records include a gtid string (default false)
#output_gtid_position=true
# records include fields with null values (default true). If this is false,
# fields where the value is null will be omitted entirely from output.
#output_nulls=true
# records include server_id (default false)
#output_server_id=true
# records include thread_id (default false)
#output_thread_id=true
# records include schema_id (default false)
#output_schema_id=true
# records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false)
# 是否包含 INSERT/UPDATE/DELETE 语句. Mysql需要开启 binlog_rows_query_log_events
output_row_query=true
# DML records include list of values that make up a row's primary key (default false)
#output_primary_keys=true
# DML records include list of columns that make up a row's primary key (default false)
#output_primary_key_columns=true
# records include commit and xid (default true)
#output_commit_info=true
# This controls whether maxwell will output JSON information containing
# DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
# See also: ddl_kafka_topic
# 是否包含 DDL (table-alter, table-create, etc) events
output_ddl=true
# turns underscore naming style of fields to camel case style in JSON output
# default is none, which means the field name in JSON is the exact name in MySQL table
#output_naming_strategy=underscore_to_camelcase
# *** kafka ***
# list of kafka brokers
kafka.bootstrap.servers=192.168.109.55:9092
# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell_test
# alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl
# hash function to use. "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]
# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]
# extra kafka options. Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.
# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
#kafka.batch.size=16384
# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#
# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from. The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0
# *** partitioning ***
# What part of the data do we partition by?
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
# specify what fields to partition by when using producer_partition_by=column
# column separated list.
#producer_partition_columns=id,foo,bar
# when using producer_partition_by=column, partition by this when
# the specified column(s) don't exist.
#producer_partition_by_fallback=database
# *** kinesis ***
#kinesis_stream=maxwell
# AWS places a 256 unicode character limit on the max key length of a record
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
#
# Setting this option to true enables hashing the key with the md5 algorithm
# before we send it to kinesis so all the keys work within the key size limit.
# Values: true, false
# Default: false
#kinesis_md5_keys=true
# *** sqs ***
#sqs_queue_uri=aws_sqs_queue_uri
# The sqs producer will need aws credentials configured in the default
# root folder and file format. Please check below link on how to do it.
# http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html
# *** pub/sub ***
#pubsub_project_id=maxwell
#pubsub_topic=maxwell
#ddl_pubsub_topic=maxwell_ddl
# *** rabbit-mq ***
#rabbitmq_host=rabbitmq_hostname
#rabbitmq_port=5672
#rabbitmq_user=guest
#rabbitmq_pass=guest
#rabbitmq_virtual_host=/
#rabbitmq_exchange=maxwell
#rabbitmq_exchange_type=fanout
#rabbitmq_exchange_durable=false
#rabbitmq_exchange_autodelete=false
#rabbitmq_routing_key_template=%db%.%table%
#rabbitmq_message_persistent=false
#rabbitmq_declare_exchange=true
# *** redis ***
#redis_host=redis_host
#redis_port=6379
#redis_auth=redis_auth
#redis_database=0
# name of pubsub/list/whatever key to publish to
#redis_key=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_pub_channel=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_list_key=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# Valid values for redis_type = pubsub|lpush. Defaults to pubsub
#redis_type=pubsub
# *** custom producer ***
# the fully qualified class name for custom ProducerFactory
# see the following link for more details.
# http://maxwells-daemon.io/producers/#custom-producer
#custom_producer.factory=
# custom producer properties can be configured using the custom_producer.* property namespace
#custom_producer.custom_prop=foo
# *** filtering ***
# filter rows out of Maxwell's output. Command separated list of filter-rules, evaluated in sequence.
# A filter rule is:
# <type> ":" <db> "." <tbl> [ "." <col> "=" <col_val> ]
# type ::= [ "include" | "exclude" | "blacklist" ]
# db ::= [ "/regexp/" | "string" | "`string`" | "*" ]
# tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ]
# col_val ::= "column_name"
# tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#
# See http://maxwells-daemon.io/filtering for more details
#
#filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match"
# javascript filter
# maxwell can run a bit of javascript for each row if you need very custom filtering/data munging.
# See http://maxwells-daemon.io/filtering/#javascript_filters for more details
#
#javascript=/path/to/javascript_filter_file
# *** encryption ***
# Encryption mode. Possible values are none, data, and all. (default none)
#encrypt=none
# Specify the secret key to be used
#secret_key=RandomInitVector
# *** monitoring ***
metrics_type=http
metrics_prefix=metrics
# Maxwell collects metrics via dropwizard. These can be exposed through the
# base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog.
# Options: [jmx, slf4j, http, datadog]
# Supplying multiple is allowed.
#metrics_type=jmx,slf4j,http
# The prefix maxwell will apply to all metrics
#metrics_prefix=MaxwellMetrics # default MaxwellMetrics
# Enable (dropwizard) JVM metrics, default false
metrics_jvm=true
# When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds
#metrics_slf4j_interval=60
# When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
http_port=8080
# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
http_path_prefix=/
# ** The following are Datadog specific. **
# When metrics_type includes datadog this is the way metrics will be reported.
# Options: [udp, http]
# Supplying multiple is not allowed.
#metrics_datadog_type=http
# datadog tags that should be supplied
#metrics_datadog_tags=tag1:value1,tag2:value2
# The frequency metrics are pushed to datadog, in seconds
#metrics_datadog_interval=60
# required if metrics_datadog_type = http
#metrics_datadog_apikey=API_KEY
# required if metrics_datadog_type = udp
#metrics_datadog_host=localhost # default localhost
#metrics_datadog_port=8125 # default 8125
# Maxwell exposes http diagnostic endpoint to check below in parallel:
# 1. binlog replication lag
# 2. producer (currently kafka) lag
# To enable Maxwell diagnostic
# default false
#http_diagnostic=true
# Diagnostic check timeout in milliseconds, required if diagnostic = true
# default 10000
#http_diagnostic_timeout=10000
# *** misc ***
# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic. Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions). sync mode is safer but you
# have to stop replication.
#bootstrapper=async [sync, async, none]
# output filename when using the "file" producer
output_file=/home/collie/lizj/data/mysql_binlog_data.log
```
### 3、启动
```shell
maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties
加--daemon转为后台进程:
maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties --daemon
```
## producer
### stdout
```shell
json包含binlog位点,包含ddl语句
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --output_binlog_position=true --output_ddl=true --producer=stdout
bin/maxwell --config conf/prod.maxwell.properties
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=stdout
```
### file
```shell
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
bin/maxwell --config conf/prod.maxwell.properties
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=file
# output filename when using the "file" producer
output_file=/home/collie/lizj/data/mysql_binlog_data.log
```
### kafka
```shell
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
bin/maxwell --config conf/prod.maxwell.properties
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# *** kafka ***
# list of kafka brokers
kafka.bootstrap.servers=192.168.109.55:9092
# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell_test
# alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic
ddl_kafka_topic=maxwell_test_ddl
# hash function to use. "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]
# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]
# extra kafka options. Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.
# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
#kafka.batch.size=16384
# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#
# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from. The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0
```
## 输出分析
```json
{"database":"ic","table":"test","type":"update","ts":1631520880,"xid":1466241500,"commit":true,"data":{"id":3,"platform_name":"test","goods_id":"12354631","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:14:40"},"old":{"goods_id":"123564631","lastupdatetime":"2021-09-13 08:12:56"}}
{"type":"table-alter","database":"ic","table":"test","old":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"}],"primary-key":["id"]},"def":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"},{"type":"varchar","name":"test1","charset":"utf8"}],"primary-key":["id"]},"ts":1631522666000,"sql":"/* ApplicationName=PyCharm 2021.1.3 */ alter table test add column test1 varchar(64) default 'true'"}
{"type":"table-drop","database":"ic","table":"test1","ts":1631520801000,"sql":"DROP TABLE `test1` /* generated by server */"}
{"database":"ic","table":"test","type":"insert","ts":1631520701,"xid":1466240458,"commit":true,"data":{"id":9,"platform_name":"das_sad","goods_id":"121","create_time":"2021-09-13 08:11:41","lastupdatetime":"2021-09-13 08:11:41"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ update test set goods_id = 1234151 where id = 5","type":"update","ts":1631522576,"xid":1466250035,"commit":true,"data":{"id":5,"platform_name":"test","goods_id":"1234151","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:42:56","test":"1"},"old":{"goods_id":"12345151","lastupdatetime":"2021-09-13 08:28:29"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":0,"data":{"id":17,"platform_name":"das6","goods_id":"16566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":1,"data":{"id":18,"platform_name":"das5","goods_id":"15566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":2,"data":{"id":19,"platform_name":"das4","goods_id":"14566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":3,"data":{"id":20,"platform_name":"das3","goods_id":"13566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":4,"data":{"id":21,"platform_name":"das2","goods_id":"12566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"commit":true,"data":{"id":22,"platform_name":"das1","goods_id":"11566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
字段说明:
database:库
table:表
data:最新的数据,修改后的数据
old:修改前的数据
ts:操作的时间戳
sql/query:sql语句
type:操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
xid:事务id,批量插入或删除时,每条单独输出,通过相同的事务id、不同的事务offset联系,最后一条commit
xoffset:事务offset
commit:同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
maxwell支持多种编码,但仅输出utf8编码
```
## maxwell解析:
### 角色区分
```
--host,指定表结构来源的主机,即建maxwell库的主机
--replication_host,指定binlog来源的主机,用于master-slave结构中,不能在slave中建maxwell库的情况
--schema_host,指定表结构的主机,仅用于复制代理的情况
注:bootstrapping不适用于host和replication分离的情形
```
### 多实例
```shell
若需要运行多个Maxwell,需要为每个实例配置不同的client_id,以存储不同的binlog位点;同时,还需要配置replica_server_id,指定一个不同的server_id,如
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=maxwell--replica_server_id=6379 #默认
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=my --replica_server_id=2345
```
### 过滤:
```shell
#仅匹配foodb数据库的tbl表和所有table_数字的表
--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
#排除所有库所有表,仅匹配db1数据库
--filter = 'exclude: *.*, include: db1.*'
#排除含db.tbl.col列值为reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
#排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
#完全排除bad_db数据库,若要恢复,必须删除maxwell库
--filter = 'blacklist: bad_db.*
```
### bootstrap功能
```shell
使用maxwell-bootstrap时,可用的参数有
--log_level,日志级别
--user,用户名
--password,密码
--host,主机
--port,端口
--database,包含需要bootstrap的表的数据库名
--table,需要bootstrap的表名
--where,限制条件
--client_id,maxwell实例名称
或者,也可以直接在maxwell.bootstrap表中手动添加触发同步
insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');
示例:
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
注意--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;--bootstrapper=async时,不会阻塞
json中的type
bootstrap-start -> bootstrap-insert -> bootstrap-complete #其中,start和complete的data字段为空,不携带数据
在进行bootstrap过程中,若maxwell崩溃,重启时,bootstrap会完全重新开始,不管之前进行到多少,若不希望,可以设置complete字段值为1(完成),或者删除该行
```
### 监控/指标
```shell
所有指标目前都是针对kafka的
当启动HTTP节点后,会有以下路径
/metrics,返回json格式的指标数据
/healthcheck,健康检查,如果大于0说明在过去15min内有错误
/ping,返回pong
导出JMX信息,需要预先设置JAVA_OPTS
```
## 附:完整参数
| 参数 | 值类型 | 描述 | 默认值 |
| ------------------------------------------------------------ | ---------------------- | ------------------------------------------------------------ | ----------------- |
| **全局参数** | | | |
| config | 字符串 | 指定config.properties配置文件路径 | 当前目录$PWD |
| log_level | debug,info,warn,error | 日志级别 | info |
| daemon | | maxwell作为守护进程运行 | |
| env_config_prefix | 字符串 | 作为配置项对待的环境变量前缀,如FOO_ | |
| **mysql参数** | | | |
| host | 字符串 | mysql host | localhost |
| user | 字符串 | mysql username | |
| password | 字符串 | mysql password | (no password) |
| port | INT | mysql port | 3306 |
| jdbc_options | 字符串 | mysql jdbc 连接串 | DEFAULT_JDBC* |
| ssl | SSL_OPT | SSL启动 | DISABLED |
| schema_database | 字符串 | 存储表结构和binlog位点的数据库名称 | maxwell |
| client_id | 字符串 | maxwell实例唯一定义文本 | maxwell |
| replica_server_id | LONG | maxwell实例唯一定义编号,对应于MySQL server_id | 6379 |
| master_recovery | 布尔值 | 启用实验性master恢复代码 | FALSE |
| gtid_mode | 布尔值 | 启动基于GTID的复制 | FALSE |
| recapture_schema | 布尔值 | 重新获得最新表结构. 配置文件中不适用 | FALSE |
| **replication参数**,用于master-slave的架构,在slave上不方便创建maxwell库,从slave获取binlog | | | |
| replication_host | 字符串 | 主机 | schema-store host |
| replication_password | 字符串 | 密码 | (none) |
| replication_port | INT | 端口 | 3306 |
| replication_user | 字符串 | 用户 | |
| replication_ssl | SSL_OPT* | 启用SSL | DISABLED |
| **schema参数**,仅用于复制代理的情形,获取表结构 | | | |
| schema_host | 字符串 | 主机 | schema-store host |
| schema_password | 字符串 | 密码 | (none) |
| schema_port | INT | 端口 | 3306 |
| schema_user | 字符串 | 用户 | |
| schema_ssl | SSL_OPT* | 启用SSL | DISABLED |
| **producer参数** | | | |
| producer | PRODUCER_TYPE* | 消费者类型 | stdout |
| custom_producer.factory | CLASS_NAME | 自定义消费者的工厂类 | |
| producer_ack_timeout | | 异步消费认为消息丢失的超时时间,毫秒ms | |
| producer_partition_by | PARTITION_BY* | 输入到kafka/kinesis的分区函数 | database |
| producer_partition_columns | 字符串 | 若按列分区,以逗号分隔的列名称 | |
| producer_partition_by_fallback | PARTITION_BY_F* | 按列分区时需要,当列不存在是使用 | |
| ignore_producer_error | 布尔值 | 为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 | TRUE |
| **producer=file** | | | |
| output_file | 字符串 | 输出的文件路径 | |
| javascript | 字符串 | 指定javascript过滤器文件 | |
| **producer=kafka** | | | |
| kafka.bootstrap.servers | 字符串 | kafka brokers, 格式 HOST:PORT[,HOST:PORT] | |
| kafka_topic | 字符串 | kafka topic | maxwell |
| kafka_version | KAFKA_VERSION* | kafka版本. 配置文件config.properties中不适用 | 0.11.0.1 |
| kafka_partition_hash | default,murmur3 | kafka分区hash函数 | default |
| kafka_key_format | array,hash | maxwell输出kafka keys的格式 | hash |
| ddl_kafka_topic | 字符串 | 若output_ddl为true, kafka topic | kafka_topic |
| **producer=kinesis** | | | |
| kinesis_stream | 字符串 | kinesis stream name | |
| **producer=sqs** | | | |
| sqs_queue_uri | 字符串 | SQS Queue URI | |
| **producer=pubsub** | | | |
| pubsub_topic | 字符串 | Google Cloud pub-sub topic | |
| pubsub_platform_id | 字符串 | Google Cloud platform id associated with topic | |
| ddl_pubsub_topic | 字符串 | Google Cloud pub-sub topic to send DDL events to | |
| **producer=rabbitmq** | | | |
| rabbitmq_user | 字符串 | 用户名 | guest |
| rabbitmq_pass | 字符串 | 密码 | guest |
| rabbitmq_host | 字符串 | 主机 | |
| rabbitmq_port | INT | 端口 | |
| rabbitmq_virtual_host | 字符串 | 虚拟主机 | |
| rabbitmq_exchange | 字符串 | 交换器名称 | |
| rabbitmq_exchange_type | 字符串 | 交换器类型 | |
| rabbitmq_exchange_durable | 布尔值 | 交换器是否持久化 | FALSE |
| rabbitmq_exchange_autodelete | 布尔值 | 为true时,当所有队列都完成时,删除交换器 | FALSE |
| rabbitmq_routing_key_template | 字符串 | 路由键模板,可用 %db% 和 %table% 作为变量 | %db%.%table% |
| rabbitmq_message_persistent | 布尔值 | 启用消息持久化 | FALSE |
| rabbitmq_declare_exchange | 布尔值 | 需要声明交换器 | TRUE |
| **producer=redis** | | | |
| redis_host | 字符串 | 主机 | localhost |
| redis_port | INT | 端口 | 6379 |
| redis_auth | 字符串 | 密码 | |
| redis_database | INT | 数据库[0-15] | 0 |
| redis_pub_channel | 字符串 | Pub/Sub模式的chanal名 | maxwell |
| redis_list_key | 字符串 | LPUSH模式的列表名 | maxwell |
| redis_type | pubsub,lpush | 模式选择 | pubsub |
| **formatting格式化** | | | |
| output_binlog_position | 布尔值 | 包含binlog位点 | FALSE |
| output_gtid_position | 布尔值 | 包含gtid位点 | FALSE |
| output_commit_info | 布尔值 | 包含commit和xid | TRUE |
| output_xoffset | 布尔值 | 包含虚拟的行偏移 | FALSE |
| output_nulls | 布尔值 | 包含NULL值 | TRUE |
| output_server_id | 布尔值 | 包含server_id | FALSE |
| output_thread_id | 布尔值 | 包含thread_id | FALSE |
| output_row_query | 布尔值 | 包含INSERT/UPDATE/DELETE语句. Mysql需要开启binlog_rows_query_log_events | FALSE |
| output_ddl | 布尔值 | 包含DDL (table-alter, table-create, etc)事件 | FALSE |
| **filtering过滤器** | | | |
| filter | 字符串 | 过滤规则,如 exclude: db.*, include: *.tbl, include: *./bar(bar)?/, exclude: foo.bar.col=val | |
| **encryption加密** | | | |
| encrypt | none,data,all | 加密模式: none不加密,data仅加密data字段,all加密整个消息 | none |
| secret_key | 字符串 | encryption key | null |
| **monitoring / metrics指标** | | | |
| metrics_prefix | 字符串 | 指标前缀 | MaxwellMetrics |
| metrics_type | slf4j,jmx,http,datadog | 指标类型 | |
| metrics_jvm | 布尔值 | 启用jvm指标: memory usage, GC stats等 | FALSE |
| metrics_slf4j_interval | SECONDS | slf4j的频率指标,秒 | 60 |
| http_port | INT | http方式的端口 | 8080 |
| http_path_prefix | 字符串 | http方式的路径前缀 | / |
| http_bind_address | 字符串 | http绑定的地址 | all addresses |
| http_diagnostic | 布尔值 | 启用http诊断 | FALSE |
| http_diagnostic_timeout | MILLISECONDS | http诊断响应超时 | 10000 |
| metrics_datadog_type | udp,http | datadog类型 | udp |
| metrics_datadog_tags | 字符串 | datadog tags如 tag1:value1,tag2:value2 | |
| metrics_datadog_interval | INT | datadog推的频率,秒 | 60 |
| metrics_datadog_apikey | 字符串 | datadog api key仅当metrics_datadog_type = http | |
| metrics_datadog_host | 字符串 | 目标主机,仅当metrics_datadog_type = udp | localhost |
| metrics_datadog_port | INT | 端口,仅当metrics_datadog_type = udp | 8125 |
| **其它** | | | |
| bootstrapper | async,sync,none | bootstrapper类型 | async |
| init_position | FILE:POS[:HEARTBEAT] | 从指定位点启动,配置文件中不适用 | |
| replay | 布尔值 | 启用只读重放模式,不存储binlog位点或结构变更,配置文件中不适用 | |
\ No newline at end of file
Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages