Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Best_practice
  • maxwell

Last edited by 李子健 Apr 28, 2023
Page history
This is an old version of this page. You can view the most recent version or browse the history.

maxwell

前置条件

1、MySQL

MySQL需要开启binlog功能,binlog格式需为row,要设置一个唯一的server_id,binlog_row_image应该为FULL(默认就是FULL)
vi /etc/my.cnf  #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL

GTID模式:
vi /etc/my.cnf  #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL
gtid-mode=ON
log-slave-updates=ON
enforce-gtid-consistency=true

2、Maxwell库

Maxwell需要建一个数据库,用于存放状态信息,默认库名为maxwell,该库在maxwell启动时会自动创建,但是需要为其设置相应权限;maxwell运行时需要获取复制状态信息,以及在information_schema中获取数据库及字段信息,所以还需要授予相应权限。如果使用root用户,可自行创建名为maxwell的库。
CREATE USER 'maxwell'@'%' identified by 'XXXXXX';
GRANT ALL on maxwell.* to 'maxwell'@'%' ;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;

3、java

maxwell运行需要java

简单测试

1、安装

wget https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
tar zxvf maxwell-1.27.1.tar.gz
mv maxwell-1.27.1/ maxwell

2、配置

mkdir -p maxwell/conf
cd maxwell/conf
cp ../maxwell/config.properties.example ./prod.maxwell.properties
vim prod.maxwell.properties

常用配置:
config config.properties, 配置文件
log_level info,日志级别
daemon, 守护进程
schema_database maxwell, 数据库
client_id maxwell, maxwell实例名称
replica_server_id 6379, 类似于server_id,多实例
filter, 过滤器
output_binlog_position true, 记录binlog位点
output_ddl true, 记录ddl变更
配置优先级:
命令行 -> 环境变量 -> 配置文件 -> 默认值

prod.maxwell.properties(启动时指定的参数配置文件,也可直接maxwell命令后接各参数启动)

# tl;dr config
log_level=info

#producer=kafka
#kafka.bootstrap.servers=192.168.109.55:9092
#kafka_topic=prod.%{database}.%{table}
#kafka_topic=maxwell_test

# mysql login info
host=192.168.109.200
port=3801
user=root
password=root123


#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=file

# set the log level.  note that you can configure things further in log4j2.xml
# [DEBUG, INFO, WARN, ERROR]
log_level=INFO 

# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
#env_config_prefix=/home/collie/lizj/maxwell/

#     *** mysql ***

# mysql host to connect to
host=192.168.109.200

# mysql port to connect to
port=3801

# mysql user to connect as.  This user must have REPLICATION SLAVE permissions,
# as well as full access to the `maxwell` (or schema_database) database
user=root

# mysql password
password=root123

# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

# name of the mysql database where maxwell keeps its own state
#schema_database=maxwell

# whether to use GTID or not for positioning
#gtid_mode=true

# SSL/TLS options
# To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts:
#   -Djavax.net.ssl.trustStore=<truststore> -Djavax.net.ssl.trustStorePassword=<password>
# or import the MySQL cert into the global Java cacerts.
# MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY
#
# turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified
#ssl=DISABLED
# for binlog-connector
#replication_ssl=DISABLED
# for the schema-capture connection, if used
#schema_ssl=DISABLED

# maxwell can optionally replicate from a different server than where it stores
# schema and binlog position info.  Specify that different server here:

# 如果指定了 replication_host,那么它是真正的binlog来源的mysql server地址
#replication_host=other
#replication_user=username
#replication_password=password
#replication_port=3306

# This may be useful when using MaxScale's binlog mirroring host.
# Specifies that Maxwell should capture schema from a different server than
# it replicates from:

# 从哪个host获取表结构
#schema_host=other
#schema_user=username
#schema_password=password
#schema_port=3306

#       *** output format ***

# records include binlog position (default false)
#output_binlog_position=true

# records include a gtid string (default false)
#output_gtid_position=true

# records include fields with null values (default true).  If this is false,
# fields where the value is null will be omitted entirely from output.
#output_nulls=true

# records include server_id (default false)
#output_server_id=true

# records include thread_id (default false)
#output_thread_id=true

# records include schema_id (default false)
#output_schema_id=true

# records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false)
# 是否包含 INSERT/UPDATE/DELETE 语句. Mysql需要开启 binlog_rows_query_log_events
output_row_query=true

# DML records include list of values that make up a row's primary key (default false)
#output_primary_keys=true

# DML records include list of columns that make up a row's primary key (default false)
#output_primary_key_columns=true

# records include commit and xid (default true)
#output_commit_info=true

# This controls whether maxwell will output JSON information containing
# DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
# See also: ddl_kafka_topic
# 是否包含 DDL (table-alter, table-create, etc) events
output_ddl=true

# turns underscore naming style of fields to camel case style in JSON output
# default is none, which means the field name in JSON is the exact name in MySQL table
#output_naming_strategy=underscore_to_camelcase

#       *** kafka ***

# list of kafka brokers
kafka.bootstrap.servers=192.168.109.55:9092

# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell_test

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

# extra kafka options.  Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.

# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
#kafka.batch.size=16384


# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#

# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from.  The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0


#           *** partitioning ***

# What part of the data do we partition by?
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]

# specify what fields to partition by when using producer_partition_by=column
# column separated list.
#producer_partition_columns=id,foo,bar

# when using producer_partition_by=column, partition by this when
# the specified column(s) don't exist.
#producer_partition_by_fallback=database

#            *** kinesis ***

#kinesis_stream=maxwell

# AWS places a 256 unicode character limit on the max key length of a record
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
#
# Setting this option to true enables hashing the key with the md5 algorithm
# before we send it to kinesis so all the keys work within the key size limit.
# Values: true, false
# Default: false
#kinesis_md5_keys=true

#            *** sqs ***

#sqs_queue_uri=aws_sqs_queue_uri

# The sqs producer will need aws credentials configured in the default
# root folder and file format. Please check below link on how to do it.
# http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html

#            *** pub/sub ***

#pubsub_project_id=maxwell
#pubsub_topic=maxwell
#ddl_pubsub_topic=maxwell_ddl

#            *** rabbit-mq ***

#rabbitmq_host=rabbitmq_hostname
#rabbitmq_port=5672
#rabbitmq_user=guest
#rabbitmq_pass=guest
#rabbitmq_virtual_host=/
#rabbitmq_exchange=maxwell
#rabbitmq_exchange_type=fanout
#rabbitmq_exchange_durable=false
#rabbitmq_exchange_autodelete=false
#rabbitmq_routing_key_template=%db%.%table%
#rabbitmq_message_persistent=false
#rabbitmq_declare_exchange=true

#           *** redis ***

#redis_host=redis_host
#redis_port=6379
#redis_auth=redis_auth
#redis_database=0

# name of pubsub/list/whatever key to publish to
#redis_key=maxwell

# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_pub_channel=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_list_key=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# Valid values for redis_type = pubsub|lpush. Defaults to pubsub

#redis_type=pubsub

#           *** custom producer ***

# the fully qualified class name for custom ProducerFactory
# see the following link for more details.
# http://maxwells-daemon.io/producers/#custom-producer
#custom_producer.factory=

# custom producer properties can be configured using the custom_producer.* property namespace
#custom_producer.custom_prop=foo

#          *** filtering ***

# filter rows out of Maxwell's output.  Command separated list of filter-rules, evaluated in sequence.
# A filter rule is:
#  <type> ":" <db> "." <tbl> [ "." <col> "=" <col_val> ]
#  type    ::= [ "include" | "exclude" | "blacklist" ]
#  db      ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#  col_val ::= "column_name"
#  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#
# See http://maxwells-daemon.io/filtering for more details
#
#filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match"

# javascript filter
# maxwell can run a bit of javascript for each row if you need very custom filtering/data munging.
# See http://maxwells-daemon.io/filtering/#javascript_filters for more details
#
#javascript=/path/to/javascript_filter_file

#       *** encryption ***

# Encryption mode. Possible values are none, data, and all. (default none)
#encrypt=none

# Specify the secret key to be used
#secret_key=RandomInitVector

#       *** monitoring ***

metrics_type=http
metrics_prefix=metrics
# Maxwell collects metrics via dropwizard. These can be exposed through the
# base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog.
# Options: [jmx, slf4j, http, datadog]
# Supplying multiple is allowed.
#metrics_type=jmx,slf4j,http

# The prefix maxwell will apply to all metrics
#metrics_prefix=MaxwellMetrics # default MaxwellMetrics

# Enable (dropwizard) JVM metrics, default false
metrics_jvm=true

# When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds
#metrics_slf4j_interval=60

# When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
http_port=8080

# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
http_path_prefix=/

# ** The following are Datadog specific. **
# When metrics_type includes datadog this is the way metrics will be reported.
# Options: [udp, http]
# Supplying multiple is not allowed.
#metrics_datadog_type=http

# datadog tags that should be supplied
#metrics_datadog_tags=tag1:value1,tag2:value2

# The frequency metrics are pushed to datadog, in seconds
#metrics_datadog_interval=60

# required if metrics_datadog_type = http
#metrics_datadog_apikey=API_KEY

# required if metrics_datadog_type = udp
#metrics_datadog_host=localhost # default localhost
#metrics_datadog_port=8125 # default 8125

# Maxwell exposes http diagnostic endpoint to check below in parallel:
# 1. binlog replication lag
# 2. producer (currently kafka) lag

# To enable Maxwell diagnostic
# default false
#http_diagnostic=true 

# Diagnostic check timeout in milliseconds, required if diagnostic = true
# default 10000
#http_diagnostic_timeout=10000 

#    *** misc ***

# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic.  Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions).  sync mode is safer but you
# have to stop replication.
#bootstrapper=async [sync, async, none]

# output filename when using the "file" producer
output_file=/home/collie/lizj/data/mysql_binlog_data.log

3、启动

maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties
加--daemon转为后台进程:
maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties --daemon

producer

stdout

json包含binlog位点,包含ddl语句
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --output_binlog_position=true --output_ddl=true --producer=stdout
bin/maxwell --config conf/prod.maxwell.properties

#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=stdout

file

bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
bin/maxwell --config conf/prod.maxwell.properties

#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=file
# output filename when using the "file" producer
output_file=/home/collie/lizj/data/mysql_binlog_data.log

kafka

bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
bin/maxwell --config conf/prod.maxwell.properties

#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#       *** kafka ***

# list of kafka brokers
kafka.bootstrap.servers=192.168.109.55:9092

# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell_test

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
ddl_kafka_topic=maxwell_test_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

# extra kafka options.  Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.

# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
#kafka.batch.size=16384


# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#

# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from.  The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0

输出分析

{"database":"ic","table":"test","type":"update","ts":1631520880,"xid":1466241500,"commit":true,"data":{"id":3,"platform_name":"test","goods_id":"12354631","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:14:40"},"old":{"goods_id":"123564631","lastupdatetime":"2021-09-13 08:12:56"}}

{"type":"table-alter","database":"ic","table":"test","old":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"}],"primary-key":["id"]},"def":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"},{"type":"varchar","name":"test1","charset":"utf8"}],"primary-key":["id"]},"ts":1631522666000,"sql":"/* ApplicationName=PyCharm 2021.1.3 */ alter table test add column test1 varchar(64) default 'true'"}

{"type":"table-drop","database":"ic","table":"test1","ts":1631520801000,"sql":"DROP TABLE `test1` /* generated by server */"}

{"database":"ic","table":"test","type":"insert","ts":1631520701,"xid":1466240458,"commit":true,"data":{"id":9,"platform_name":"das_sad","goods_id":"121","create_time":"2021-09-13 08:11:41","lastupdatetime":"2021-09-13 08:11:41"}}

{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ update test set goods_id = 1234151 where id = 5","type":"update","ts":1631522576,"xid":1466250035,"commit":true,"data":{"id":5,"platform_name":"test","goods_id":"1234151","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:42:56","test":"1"},"old":{"goods_id":"12345151","lastupdatetime":"2021-09-13 08:28:29"}}

{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":0,"data":{"id":17,"platform_name":"das6","goods_id":"16566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":1,"data":{"id":18,"platform_name":"das5","goods_id":"15566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":2,"data":{"id":19,"platform_name":"das4","goods_id":"14566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":3,"data":{"id":20,"platform_name":"das3","goods_id":"13566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":4,"data":{"id":21,"platform_name":"das2","goods_id":"12566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"commit":true,"data":{"id":22,"platform_name":"das1","goods_id":"11566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}



字段说明:
database:库
table:表
data:最新的数据,修改后的数据
old:修改前的数据
ts:操作的时间戳
sql/query:sql语句
type:操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
xid:事务id,批量插入或删除时,每条单独输出,通过相同的事务id、不同的事务offset联系,最后一条commit
xoffset:事务offset
commit:同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
maxwell支持多种编码,但仅输出utf8编码

maxwell解析:

角色区分

--host,指定表结构来源的主机,即建maxwell库的主机
--replication_host,指定binlog来源的主机,用于master-slave结构中,不能在slave中建maxwell库的情况
--schema_host,指定表结构的主机,仅用于复制代理的情况
注:bootstrapping不适用于host和replication分离的情形

多实例

若需要运行多个Maxwell,需要为每个实例配置不同的client_id,以存储不同的binlog位点;同时,还需要配置replica_server_id,指定一个不同的server_id,如
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=maxwell--replica_server_id=6379 #默认
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=my --replica_server_id=2345

过滤:

#仅匹配foodb数据库的tbl表和所有table_数字的表
--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
#排除所有库所有表,仅匹配db1数据库
--filter = 'exclude: *.*, include: db1.*'
#排除含db.tbl.col列值为reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
#排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
#完全排除bad_db数据库,若要恢复,必须删除maxwell库
--filter = 'blacklist: bad_db.*

bootstrap功能

使用maxwell-bootstrap时,可用的参数有
--log_level,日志级别
--user,用户名
--password,密码
--host,主机
--port,端口
--database,包含需要bootstrap的表的数据库名
--table,需要bootstrap的表名
--where,限制条件
--client_id,maxwell实例名称
或者,也可以直接在maxwell.bootstrap表中手动添加触发同步
insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');

示例:
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
注意--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;--bootstrapper=async时,不会阻塞
json中的type
bootstrap-start -> bootstrap-insert -> bootstrap-complete  #其中,start和complete的data字段为空,不携带数据
在进行bootstrap过程中,若maxwell崩溃,重启时,bootstrap会完全重新开始,不管之前进行到多少,若不希望,可以设置complete字段值为1(完成),或者删除该行

监控/指标

所有指标目前都是针对kafka的
当启动HTTP节点后,会有以下路径
/metrics,返回json格式的指标数据
/healthcheck,健康检查,如果大于0说明在过去15min内有错误
/ping,返回pong
导出JMX信息,需要预先设置JAVA_OPTS

附:完整参数

参数 值类型 描述 默认值
全局参数
config 字符串 指定config.properties配置文件路径 当前目录$PWD
log_level debug,info,warn,error 日志级别 info
daemon maxwell作为守护进程运行
env_config_prefix 字符串 作为配置项对待的环境变量前缀,如FOO_
mysql参数
host 字符串 mysql host localhost
user 字符串 mysql username
password 字符串 mysql password (no password)
port INT mysql port 3306
jdbc_options 字符串 mysql jdbc 连接串 DEFAULT_JDBC*
ssl SSL_OPT SSL启动 DISABLED
schema_database 字符串 存储表结构和binlog位点的数据库名称 maxwell
client_id 字符串 maxwell实例唯一定义文本 maxwell
replica_server_id LONG maxwell实例唯一定义编号,对应于MySQL server_id 6379
master_recovery 布尔值 启用实验性master恢复代码 FALSE
gtid_mode 布尔值 启动基于GTID的复制 FALSE
recapture_schema 布尔值 重新获得最新表结构. 配置文件中不适用 FALSE
replication参数,用于master-slave的架构,在slave上不方便创建maxwell库,从slave获取binlog
replication_host 字符串 主机 schema-store host
replication_password 字符串 密码 (none)
replication_port INT 端口 3306
replication_user 字符串 用户
replication_ssl SSL_OPT* 启用SSL DISABLED
schema参数,仅用于复制代理的情形,获取表结构
schema_host 字符串 主机 schema-store host
schema_password 字符串 密码 (none)
schema_port INT 端口 3306
schema_user 字符串 用户
schema_ssl SSL_OPT* 启用SSL DISABLED
producer参数
producer PRODUCER_TYPE* 消费者类型 stdout
custom_producer.factory CLASS_NAME 自定义消费者的工厂类
producer_ack_timeout 异步消费认为消息丢失的超时时间,毫秒ms
producer_partition_by PARTITION_BY* 输入到kafka/kinesis的分区函数 database
producer_partition_columns 字符串 若按列分区,以逗号分隔的列名称
producer_partition_by_fallback PARTITION_BY_F* 按列分区时需要,当列不存在是使用
ignore_producer_error 布尔值 为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 TRUE
producer=file
output_file 字符串 输出的文件路径
javascript 字符串 指定javascript过滤器文件
producer=kafka
kafka.bootstrap.servers 字符串 kafka brokers, 格式 HOST:PORT[,HOST:PORT]
kafka_topic 字符串 kafka topic maxwell
kafka_version KAFKA_VERSION* kafka版本. 配置文件config.properties中不适用 0.11.0.1
kafka_partition_hash default,murmur3 kafka分区hash函数 default
kafka_key_format array,hash maxwell输出kafka keys的格式 hash
ddl_kafka_topic 字符串 若output_ddl为true, kafka topic kafka_topic
producer=kinesis
kinesis_stream 字符串 kinesis stream name
producer=sqs
sqs_queue_uri 字符串 SQS Queue URI
producer=pubsub
pubsub_topic 字符串 Google Cloud pub-sub topic
pubsub_platform_id 字符串 Google Cloud platform id associated with topic
ddl_pubsub_topic 字符串 Google Cloud pub-sub topic to send DDL events to
producer=rabbitmq
rabbitmq_user 字符串 用户名 guest
rabbitmq_pass 字符串 密码 guest
rabbitmq_host 字符串 主机
rabbitmq_port INT 端口
rabbitmq_virtual_host 字符串 虚拟主机
rabbitmq_exchange 字符串 交换器名称
rabbitmq_exchange_type 字符串 交换器类型
rabbitmq_exchange_durable 布尔值 交换器是否持久化 FALSE
rabbitmq_exchange_autodelete 布尔值 为true时,当所有队列都完成时,删除交换器 FALSE
rabbitmq_routing_key_template 字符串 路由键模板,可用 %db% 和 %table% 作为变量 %db%.%table%
rabbitmq_message_persistent 布尔值 启用消息持久化 FALSE
rabbitmq_declare_exchange 布尔值 需要声明交换器 TRUE
producer=redis
redis_host 字符串 主机 localhost
redis_port INT 端口 6379
redis_auth 字符串 密码
redis_database INT 数据库[0-15] 0
redis_pub_channel 字符串 Pub/Sub模式的chanal名 maxwell
redis_list_key 字符串 LPUSH模式的列表名 maxwell
redis_type pubsub,lpush 模式选择 pubsub
formatting格式化
output_binlog_position 布尔值 包含binlog位点 FALSE
output_gtid_position 布尔值 包含gtid位点 FALSE
output_commit_info 布尔值 包含commit和xid TRUE
output_xoffset 布尔值 包含虚拟的行偏移 FALSE
output_nulls 布尔值 包含NULL值 TRUE
output_server_id 布尔值 包含server_id FALSE
output_thread_id 布尔值 包含thread_id FALSE
output_row_query 布尔值 包含INSERT/UPDATE/DELETE语句. Mysql需要开启binlog_rows_query_log_events FALSE
output_ddl 布尔值 包含DDL (table-alter, table-create, etc)事件 FALSE
filtering过滤器
filter 字符串 过滤规则,如 exclude: db.*, include: *.tbl, include: *./bar(bar)?/, exclude: foo.bar.col=val
encryption加密
encrypt none,data,all 加密模式: none不加密,data仅加密data字段,all加密整个消息 none
secret_key 字符串 encryption key null
monitoring / metrics指标
metrics_prefix 字符串 指标前缀 MaxwellMetrics
metrics_type slf4j,jmx,http,datadog 指标类型
metrics_jvm 布尔值 启用jvm指标: memory usage, GC stats等 FALSE
metrics_slf4j_interval SECONDS slf4j的频率指标,秒 60
http_port INT http方式的端口 8080
http_path_prefix 字符串 http方式的路径前缀 /
http_bind_address 字符串 http绑定的地址 all addresses
http_diagnostic 布尔值 启用http诊断 FALSE
http_diagnostic_timeout MILLISECONDS http诊断响应超时 10000
metrics_datadog_type udp,http datadog类型 udp
metrics_datadog_tags 字符串 datadog tags如 tag1:value1,tag2:value2
metrics_datadog_interval INT datadog推的频率,秒 60
metrics_datadog_apikey 字符串 datadog api key仅当metrics_datadog_type = http
metrics_datadog_host 字符串 目标主机,仅当metrics_datadog_type = udp localhost
metrics_datadog_port INT 端口,仅当metrics_datadog_type = udp 8125
其它
bootstrapper async,sync,none bootstrapper类型 async
init_position FILE:POS[:HEARTBEAT] 从指定位点启动,配置文件中不适用
replay 布尔值 启用只读重放模式,不存储binlog位点或结构变更,配置文件中不适用
Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages