简介
Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。官网(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)
Maxwell主要提供了下列功能:
支持 SELECT * FROM table 的方式进行全量数据初始化
支持在主库发生failover后,自动恢复binlog位置(GTID)
可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区
工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event
GitHub(https://github.com/zendesk/maxwell)
前置条件
1、MySQL
MySQL需要开启binlog功能,binlog格式需为row,要设置一个唯一的server_id,binlog_row_image应该为FULL(默认就是FULL)
vi /etc/my.cnf #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL
GTID模式:
vi /etc/my.cnf #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL
gtid-mode=ON
log-slave-updates=ON
enforce-gtid-consistency=true
2、Maxwell库
Maxwell需要建一个数据库,用于存放状态信息,默认库名为maxwell,该库在maxwell启动时会自动创建,但是需要为其设置相应权限;maxwell运行时需要获取复制状态信息,以及在information_schema中获取数据库及字段信息,所以还需要授予相应权限。如果使用root用户,可自行创建名为maxwell的库。
CREATE USER 'maxwell'@'%' identified by 'XXXXXX';
GRANT ALL on maxwell.* to 'maxwell'@'%' ;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;
3、java
maxwell运行需要java
简单测试
1、安装
wget https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
tar zxvf maxwell-1.27.1.tar.gz
mv maxwell-1.27.1/ maxwell
2、配置
mkdir -p maxwell/conf
cd maxwell/conf
cp ../maxwell/config.properties.example ./prod.maxwell.properties
vim prod.maxwell.properties
常用配置:
config config.properties, 配置文件
log_level info,日志级别
daemon, 守护进程
schema_database maxwell, 数据库
client_id maxwell, maxwell实例名称
replica_server_id 6379, 类似于server_id,多实例
filter, 过滤器
output_binlog_position true, 记录binlog位点
output_ddl true, 记录ddl变更
配置优先级:
命令行 -> 环境变量 -> 配置文件 -> 默认值
prod.maxwell.properties(启动时指定的参数配置文件,也可直接maxwell命令后接各参数启动)
# tl;dr config
log_level=info
#producer=kafka
#kafka.bootstrap.servers=192.168.109.55:9092
#kafka_topic=prod.%{database}.%{table}
#kafka_topic=maxwell_test
# mysql login info
host=192.168.109.200
port=3801
user=root
password=root123
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=file
# set the log level. note that you can configure things further in log4j2.xml
# [DEBUG, INFO, WARN, ERROR]
log_level=INFO
# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
#env_config_prefix=/home/collie/lizj/maxwell/
# *** mysql ***
# mysql host to connect to
host=192.168.109.200
# mysql port to connect to
port=3801
# mysql user to connect as. This user must have REPLICATION SLAVE permissions,
# as well as full access to the `maxwell` (or schema_database) database
user=root
# mysql password
password=root123
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello
# name of the mysql database where maxwell keeps its own state
#schema_database=maxwell
# whether to use GTID or not for positioning
#gtid_mode=true
# SSL/TLS options
# To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts:
# -Djavax.net.ssl.trustStore=<truststore> -Djavax.net.ssl.trustStorePassword=<password>
# or import the MySQL cert into the global Java cacerts.
# MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY
#
# turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified
#ssl=DISABLED
# for binlog-connector
#replication_ssl=DISABLED
# for the schema-capture connection, if used
#schema_ssl=DISABLED
# maxwell can optionally replicate from a different server than where it stores
# schema and binlog position info. Specify that different server here:
# 如果指定了 replication_host,那么它是真正的binlog来源的mysql server地址
#replication_host=other
#replication_user=username
#replication_password=password
#replication_port=3306
# This may be useful when using MaxScale's binlog mirroring host.
# Specifies that Maxwell should capture schema from a different server than
# it replicates from:
# 从哪个host获取表结构
#schema_host=other
#schema_user=username
#schema_password=password
#schema_port=3306
# *** output format ***
# records include binlog position (default false)
#output_binlog_position=true
# records include a gtid string (default false)
#output_gtid_position=true
# records include fields with null values (default true). If this is false,
# fields where the value is null will be omitted entirely from output.
#output_nulls=true
# records include server_id (default false)
#output_server_id=true
# records include thread_id (default false)
#output_thread_id=true
# records include schema_id (default false)
#output_schema_id=true
# records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false)
# 是否包含 INSERT/UPDATE/DELETE 语句. Mysql需要开启 binlog_rows_query_log_events
output_row_query=true
# DML records include list of values that make up a row's primary key (default false)
#output_primary_keys=true
# DML records include list of columns that make up a row's primary key (default false)
#output_primary_key_columns=true
# records include commit and xid (default true)
#output_commit_info=true
# This controls whether maxwell will output JSON information containing
# DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
# See also: ddl_kafka_topic
# 是否包含 DDL (table-alter, table-create, etc) events
output_ddl=true
# turns underscore naming style of fields to camel case style in JSON output
# default is none, which means the field name in JSON is the exact name in MySQL table
#output_naming_strategy=underscore_to_camelcase
# *** kafka ***
# list of kafka brokers
kafka.bootstrap.servers=192.168.109.55:9092
# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell_test
# alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl
# hash function to use. "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]
# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]
# extra kafka options. Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.
# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
#kafka.batch.size=16384
# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#
# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from. The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0
# *** partitioning ***
# What part of the data do we partition by?
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
# specify what fields to partition by when using producer_partition_by=column
# column separated list.
#producer_partition_columns=id,foo,bar
# when using producer_partition_by=column, partition by this when
# the specified column(s) don't exist.
#producer_partition_by_fallback=database
# *** kinesis ***
#kinesis_stream=maxwell
# AWS places a 256 unicode character limit on the max key length of a record
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
#
# Setting this option to true enables hashing the key with the md5 algorithm
# before we send it to kinesis so all the keys work within the key size limit.
# Values: true, false
# Default: false
#kinesis_md5_keys=true
# *** sqs ***
#sqs_queue_uri=aws_sqs_queue_uri
# The sqs producer will need aws credentials configured in the default
# root folder and file format. Please check below link on how to do it.
# http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html
# *** pub/sub ***
#pubsub_project_id=maxwell
#pubsub_topic=maxwell
#ddl_pubsub_topic=maxwell_ddl
# *** rabbit-mq ***
#rabbitmq_host=rabbitmq_hostname
#rabbitmq_port=5672
#rabbitmq_user=guest
#rabbitmq_pass=guest
#rabbitmq_virtual_host=/
#rabbitmq_exchange=maxwell
#rabbitmq_exchange_type=fanout
#rabbitmq_exchange_durable=false
#rabbitmq_exchange_autodelete=false
#rabbitmq_routing_key_template=%db%.%table%
#rabbitmq_message_persistent=false
#rabbitmq_declare_exchange=true
# *** redis ***
#redis_host=redis_host
#redis_port=6379
#redis_auth=redis_auth
#redis_database=0
# name of pubsub/list/whatever key to publish to
#redis_key=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_pub_channel=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_list_key=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# Valid values for redis_type = pubsub|lpush. Defaults to pubsub
#redis_type=pubsub
# *** custom producer ***
# the fully qualified class name for custom ProducerFactory
# see the following link for more details.
# http://maxwells-daemon.io/producers/#custom-producer
#custom_producer.factory=
# custom producer properties can be configured using the custom_producer.* property namespace
#custom_producer.custom_prop=foo
# *** filtering ***
# filter rows out of Maxwell's output. Command separated list of filter-rules, evaluated in sequence.
# A filter rule is:
# <type> ":" <db> "." <tbl> [ "." <col> "=" <col_val> ]
# type ::= [ "include" | "exclude" | "blacklist" ]
# db ::= [ "/regexp/" | "string" | "`string`" | "*" ]
# tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ]
# col_val ::= "column_name"
# tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#
# See http://maxwells-daemon.io/filtering for more details
#
#filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match"
# javascript filter
# maxwell can run a bit of javascript for each row if you need very custom filtering/data munging.
# See http://maxwells-daemon.io/filtering/#javascript_filters for more details
#
#javascript=/path/to/javascript_filter_file
# *** encryption ***
# Encryption mode. Possible values are none, data, and all. (default none)
#encrypt=none
# Specify the secret key to be used
#secret_key=RandomInitVector
# *** monitoring ***
metrics_type=http
metrics_prefix=metrics
# Maxwell collects metrics via dropwizard. These can be exposed through the
# base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog.
# Options: [jmx, slf4j, http, datadog]
# Supplying multiple is allowed.
#metrics_type=jmx,slf4j,http
# The prefix maxwell will apply to all metrics
#metrics_prefix=MaxwellMetrics # default MaxwellMetrics
# Enable (dropwizard) JVM metrics, default false
metrics_jvm=true
# When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds
#metrics_slf4j_interval=60
# When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
http_port=8080
# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
http_path_prefix=/
# ** The following are Datadog specific. **
# When metrics_type includes datadog this is the way metrics will be reported.
# Options: [udp, http]
# Supplying multiple is not allowed.
#metrics_datadog_type=http
# datadog tags that should be supplied
#metrics_datadog_tags=tag1:value1,tag2:value2
# The frequency metrics are pushed to datadog, in seconds
#metrics_datadog_interval=60
# required if metrics_datadog_type = http
#metrics_datadog_apikey=API_KEY
# required if metrics_datadog_type = udp
#metrics_datadog_host=localhost # default localhost
#metrics_datadog_port=8125 # default 8125
# Maxwell exposes http diagnostic endpoint to check below in parallel:
# 1. binlog replication lag
# 2. producer (currently kafka) lag
# To enable Maxwell diagnostic
# default false
#http_diagnostic=true
# Diagnostic check timeout in milliseconds, required if diagnostic = true
# default 10000
#http_diagnostic_timeout=10000
# *** misc ***
# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic. Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions). sync mode is safer but you
# have to stop replication.
#bootstrapper=async [sync, async, none]
# output filename when using the "file" producer
output_file=/home/collie/lizj/data/mysql_binlog_data.log
3、启动
maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties
加--daemon转为后台进程:
maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties --daemon
producer
stdout
json包含binlog位点,包含ddl语句
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --output_binlog_position=true --output_ddl=true --producer=stdout
bin/maxwell --config conf/prod.maxwell.properties
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=stdout
file
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
bin/maxwell --config conf/prod.maxwell.properties
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=file
# output filename when using the "file" producer
output_file=/home/collie/lizj/data/mysql_binlog_data.log
kafka
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
bin/maxwell --config conf/prod.maxwell.properties
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# *** kafka ***
# list of kafka brokers
kafka.bootstrap.servers=192.168.109.55:9092
# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell_test
# alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic
ddl_kafka_topic=maxwell_test_ddl
# hash function to use. "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]
# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]
# extra kafka options. Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.
# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
#kafka.batch.size=16384
# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#
# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from. The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0
输出分析
{"database":"ic","table":"test","type":"update","ts":1631520880,"xid":1466241500,"commit":true,"data":{"id":3,"platform_name":"test","goods_id":"12354631","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:14:40"},"old":{"goods_id":"123564631","lastupdatetime":"2021-09-13 08:12:56"}}
{"type":"table-alter","database":"ic","table":"test","old":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"}],"primary-key":["id"]},"def":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"},{"type":"varchar","name":"test1","charset":"utf8"}],"primary-key":["id"]},"ts":1631522666000,"sql":"/* ApplicationName=PyCharm 2021.1.3 */ alter table test add column test1 varchar(64) default 'true'"}
{"type":"table-drop","database":"ic","table":"test1","ts":1631520801000,"sql":"DROP TABLE `test1` /* generated by server */"}
{"database":"ic","table":"test","type":"insert","ts":1631520701,"xid":1466240458,"commit":true,"data":{"id":9,"platform_name":"das_sad","goods_id":"121","create_time":"2021-09-13 08:11:41","lastupdatetime":"2021-09-13 08:11:41"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ update test set goods_id = 1234151 where id = 5","type":"update","ts":1631522576,"xid":1466250035,"commit":true,"data":{"id":5,"platform_name":"test","goods_id":"1234151","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:42:56","test":"1"},"old":{"goods_id":"12345151","lastupdatetime":"2021-09-13 08:28:29"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":0,"data":{"id":17,"platform_name":"das6","goods_id":"16566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":1,"data":{"id":18,"platform_name":"das5","goods_id":"15566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":2,"data":{"id":19,"platform_name":"das4","goods_id":"14566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":3,"data":{"id":20,"platform_name":"das3","goods_id":"13566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":4,"data":{"id":21,"platform_name":"das2","goods_id":"12566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"commit":true,"data":{"id":22,"platform_name":"das1","goods_id":"11566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
字段说明:
database:库
table:表
data:最新的数据,修改后的数据
old:修改前的数据
ts:操作的时间戳
sql/query:sql语句
type:操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
xid:事务id,批量插入或删除时,每条单独输出,通过相同的事务id、不同的事务offset联系,最后一条commit
xoffset:事务offset
commit:同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
maxwell支持多种编码,但仅输出utf8编码
maxwell解析:
角色区分
--host,指定表结构来源的主机,即建maxwell库的主机
--replication_host,指定binlog来源的主机,用于master-slave结构中,不能在slave中建maxwell库的情况
--schema_host,指定表结构的主机,仅用于复制代理的情况
注:bootstrapping不适用于host和replication分离的情形
多实例
若需要运行多个Maxwell,需要为每个实例配置不同的client_id,以存储不同的binlog位点;同时,还需要配置replica_server_id,指定一个不同的server_id,如
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=maxwell --replica_server_id=6379 #默认
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=my --replica_server_id=2345
过滤:
#仅匹配foodb数据库的tbl表和所有table_数字的表
--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
#排除所有库所有表,仅匹配db1数据库
--filter = 'exclude: *.*, include: db1.*'
#排除含db.tbl.col列值为reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
#排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
#完全排除bad_db数据库,若要恢复,必须删除maxwell库
--filter = 'blacklist: bad_db.*
bootstrap功能
使用maxwell-bootstrap时,可用的参数有
--log_level,日志级别
--user,用户名
--password,密码
--host,主机
--port,端口
--database,包含需要bootstrap的表的数据库名
--table,需要bootstrap的表名
--where,限制条件
--client_id,maxwell实例名称
或者,也可以直接在maxwell.bootstrap表中手动添加触发同步
insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');
示例:
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
注意--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;--bootstrapper=async时,不会阻塞
json中的type
bootstrap-start -> bootstrap-insert -> bootstrap-complete #其中,start和complete的data字段为空,不携带数据
在进行bootstrap过程中,若maxwell崩溃,重启时,bootstrap会完全重新开始,不管之前进行到多少,若不希望,可以设置complete字段值为1(完成),或者删除该行
监控/指标
所有指标目前都是针对kafka的
当启动HTTP节点后,会有以下路径
/metrics,返回json格式的指标数据
/healthcheck,健康检查,如果大于0说明在过去15min内有错误
/ping,返回pong
导出JMX信息,需要预先设置JAVA_OPTS
附:完整参数
参数 | 值类型 | 描述 | 默认值 |
---|---|---|---|
全局参数 | |||
config | 字符串 | 指定config.properties配置文件路径 | 当前目录$PWD |
log_level | debug,info,warn,error | 日志级别 | info |
daemon | maxwell作为守护进程运行 | ||
env_config_prefix | 字符串 | 作为配置项对待的环境变量前缀,如FOO_ | |
mysql参数 | |||
host | 字符串 | mysql host | localhost |
user | 字符串 | mysql username | |
password | 字符串 | mysql password | (no password) |
port | INT | mysql port | 3306 |
jdbc_options | 字符串 | mysql jdbc 连接串 | DEFAULT_JDBC* |
ssl | SSL_OPT | SSL启动 | DISABLED |
schema_database | 字符串 | 存储表结构和binlog位点的数据库名称 | maxwell |
client_id | 字符串 | maxwell实例唯一定义文本 | maxwell |
replica_server_id | LONG | maxwell实例唯一定义编号,对应于MySQL server_id | 6379 |
master_recovery | 布尔值 | 启用实验性master恢复代码 | FALSE |
gtid_mode | 布尔值 | 启动基于GTID的复制 | FALSE |
recapture_schema | 布尔值 | 重新获得最新表结构. 配置文件中不适用 | FALSE |
replication参数,用于master-slave的架构,在slave上不方便创建maxwell库,从slave获取binlog | |||
replication_host | 字符串 | 主机 | schema-store host |
replication_password | 字符串 | 密码 | (none) |
replication_port | INT | 端口 | 3306 |
replication_user | 字符串 | 用户 | |
replication_ssl | SSL_OPT* | 启用SSL | DISABLED |
schema参数,仅用于复制代理的情形,获取表结构 | |||
schema_host | 字符串 | 主机 | schema-store host |
schema_password | 字符串 | 密码 | (none) |
schema_port | INT | 端口 | 3306 |
schema_user | 字符串 | 用户 | |
schema_ssl | SSL_OPT* | 启用SSL | DISABLED |
producer参数 | |||
producer | PRODUCER_TYPE* | 消费者类型 | stdout |
custom_producer.factory | CLASS_NAME | 自定义消费者的工厂类 | |
producer_ack_timeout | 异步消费认为消息丢失的超时时间,毫秒ms | ||
producer_partition_by | PARTITION_BY* | 输入到kafka/kinesis的分区函数 | database |
producer_partition_columns | 字符串 | 若按列分区,以逗号分隔的列名称 | |
producer_partition_by_fallback | PARTITION_BY_F* | 按列分区时需要,当列不存在是使用 | |
ignore_producer_error | 布尔值 | 为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 | TRUE |
producer=file | |||
output_file | 字符串 | 输出的文件路径 | |
javascript | 字符串 | 指定javascript过滤器文件 | |
producer=kafka | |||
kafka.bootstrap.servers | 字符串 | kafka brokers, 格式 HOST:PORT[,HOST:PORT] | |
kafka_topic | 字符串 | kafka topic | maxwell |
kafka_version | KAFKA_VERSION* | kafka版本. 配置文件config.properties中不适用 | 0.11.0.1 |
kafka_partition_hash | default,murmur3 | kafka分区hash函数 | default |
kafka_key_format | array,hash | maxwell输出kafka keys的格式 | hash |
ddl_kafka_topic | 字符串 | 若output_ddl为true, kafka topic | kafka_topic |
producer=kinesis | |||
kinesis_stream | 字符串 | kinesis stream name | |
producer=sqs | |||
sqs_queue_uri | 字符串 | SQS Queue URI | |
producer=pubsub | |||
pubsub_topic | 字符串 | Google Cloud pub-sub topic | |
pubsub_platform_id | 字符串 | Google Cloud platform id associated with topic | |
ddl_pubsub_topic | 字符串 | Google Cloud pub-sub topic to send DDL events to | |
producer=rabbitmq | |||
rabbitmq_user | 字符串 | 用户名 | guest |
rabbitmq_pass | 字符串 | 密码 | guest |
rabbitmq_host | 字符串 | 主机 | |
rabbitmq_port | INT | 端口 | |
rabbitmq_virtual_host | 字符串 | 虚拟主机 | |
rabbitmq_exchange | 字符串 | 交换器名称 | |
rabbitmq_exchange_type | 字符串 | 交换器类型 | |
rabbitmq_exchange_durable | 布尔值 | 交换器是否持久化 | FALSE |
rabbitmq_exchange_autodelete | 布尔值 | 为true时,当所有队列都完成时,删除交换器 | FALSE |
rabbitmq_routing_key_template | 字符串 | 路由键模板,可用 %db% 和 %table% 作为变量 | %db%.%table% |
rabbitmq_message_persistent | 布尔值 | 启用消息持久化 | FALSE |
rabbitmq_declare_exchange | 布尔值 | 需要声明交换器 | TRUE |
producer=redis | |||
redis_host | 字符串 | 主机 | localhost |
redis_port | INT | 端口 | 6379 |
redis_auth | 字符串 | 密码 | |
redis_database | INT | 数据库[0-15] | 0 |
redis_pub_channel | 字符串 | Pub/Sub模式的chanal名 | maxwell |
redis_list_key | 字符串 | LPUSH模式的列表名 | maxwell |
redis_type | pubsub,lpush | 模式选择 | pubsub |
formatting格式化 | |||
output_binlog_position | 布尔值 | 包含binlog位点 | FALSE |
output_gtid_position | 布尔值 | 包含gtid位点 | FALSE |
output_commit_info | 布尔值 | 包含commit和xid | TRUE |
output_xoffset | 布尔值 | 包含虚拟的行偏移 | FALSE |
output_nulls | 布尔值 | 包含NULL值 | TRUE |
output_server_id | 布尔值 | 包含server_id | FALSE |
output_thread_id | 布尔值 | 包含thread_id | FALSE |
output_row_query | 布尔值 | 包含INSERT/UPDATE/DELETE语句. Mysql需要开启binlog_rows_query_log_events | FALSE |
output_ddl | 布尔值 | 包含DDL (table-alter, table-create, etc)事件 | FALSE |
filtering过滤器 | |||
filter | 字符串 | 过滤规则,如 exclude: db.*, include: *.tbl, include: *./bar(bar)?/, exclude: foo.bar.col=val | |
encryption加密 | |||
encrypt | none,data,all | 加密模式: none不加密,data仅加密data字段,all加密整个消息 | none |
secret_key | 字符串 | encryption key | null |
monitoring / metrics指标 | |||
metrics_prefix | 字符串 | 指标前缀 | MaxwellMetrics |
metrics_type | slf4j,jmx,http,datadog | 指标类型 | |
metrics_jvm | 布尔值 | 启用jvm指标: memory usage, GC stats等 | FALSE |
metrics_slf4j_interval | SECONDS | slf4j的频率指标,秒 | 60 |
http_port | INT | http方式的端口 | 8080 |
http_path_prefix | 字符串 | http方式的路径前缀 | / |
http_bind_address | 字符串 | http绑定的地址 | all addresses |
http_diagnostic | 布尔值 | 启用http诊断 | FALSE |
http_diagnostic_timeout | MILLISECONDS | http诊断响应超时 | 10000 |
metrics_datadog_type | udp,http | datadog类型 | udp |
metrics_datadog_tags | 字符串 | datadog tags如 tag1:value1,tag2:value2 | |
metrics_datadog_interval | INT | datadog推的频率,秒 | 60 |
metrics_datadog_apikey | 字符串 | datadog api key仅当metrics_datadog_type = http | |
metrics_datadog_host | 字符串 | 目标主机,仅当metrics_datadog_type = udp | localhost |
metrics_datadog_port | INT | 端口,仅当metrics_datadog_type = udp | 8125 |
其它 | |||
bootstrapper | async,sync,none | bootstrapper类型 | async |
init_position | FILE:POS[:HEARTBEAT] | 从指定位点启动,配置文件中不适用 | |
replay | 布尔值 | 启用只读重放模式,不存储binlog位点或结构变更,配置文件中不适用 |