Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
K
kb
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 2
    • Issues 2
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
  • Merge requests 0
    • Merge requests 0
  • Operations
    • Operations
    • Incidents
  • Analytics
    • Analytics
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Commits
  • Issue Boards
Collapse sidebar
  • granite
  • kb
  • Wiki
    • Best_practice
  • maxwell

Last edited by 李子健 Apr 28, 2023
Page history

maxwell

简介

Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。官网(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)

Maxwell主要提供了下列功能:
支持 SELECT * FROM table 的方式进行全量数据初始化
支持在主库发生failover后,自动恢复binlog位置(GTID)
可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区
工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event

官网(http://maxwells-daemon.io)

GitHub(https://github.com/zendesk/maxwell)

前置条件

1、MySQL

MySQL需要开启binlog功能,binlog格式需为row,要设置一个唯一的server_id,binlog_row_image应该为FULL(默认就是FULL)
vi /etc/my.cnf  #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL

GTID模式:
vi /etc/my.cnf  #仅列相关配置项
[mysqld]
server_id=1234
log-bin=bin-log
binlog_format=row
#binlog_row_image=FULL
gtid-mode=ON
log-slave-updates=ON
enforce-gtid-consistency=true

2、Maxwell库

Maxwell需要建一个数据库,用于存放状态信息,默认库名为maxwell,该库在maxwell启动时会自动创建,但是需要为其设置相应权限;maxwell运行时需要获取复制状态信息,以及在information_schema中获取数据库及字段信息,所以还需要授予相应权限。如果使用root用户,可自行创建名为maxwell的库。
CREATE USER 'maxwell'@'%' identified by 'XXXXXX';
GRANT ALL on maxwell.* to 'maxwell'@'%' ;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;

3、java

maxwell运行需要java

简单测试

1、安装

wget https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
tar zxvf maxwell-1.27.1.tar.gz
mv maxwell-1.27.1/ maxwell

2、配置

mkdir -p maxwell/conf
cd maxwell/conf
cp ../maxwell/config.properties.example ./prod.maxwell.properties
vim prod.maxwell.properties

常用配置:
config config.properties, 配置文件
log_level info,日志级别
daemon, 守护进程
schema_database maxwell, 数据库
client_id maxwell, maxwell实例名称
replica_server_id 6379, 类似于server_id,多实例
filter, 过滤器
output_binlog_position true, 记录binlog位点
output_ddl true, 记录ddl变更
配置优先级:
命令行 -> 环境变量 -> 配置文件 -> 默认值

prod.maxwell.properties(启动时指定的参数配置文件,也可直接maxwell命令后接各参数启动)

# tl;dr config
log_level=info

#producer=kafka
#kafka.bootstrap.servers=192.168.109.55:9092
#kafka_topic=prod.%{database}.%{table}
#kafka_topic=maxwell_test

# mysql login info
host=192.168.109.200
port=3801
user=root
password=root123


#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=file

# set the log level.  note that you can configure things further in log4j2.xml
# [DEBUG, INFO, WARN, ERROR]
log_level=INFO 

# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
#env_config_prefix=/home/collie/lizj/maxwell/

#     *** mysql ***

# mysql host to connect to
host=192.168.109.200

# mysql port to connect to
port=3801

# mysql user to connect as.  This user must have REPLICATION SLAVE permissions,
# as well as full access to the `maxwell` (or schema_database) database
user=root

# mysql password
password=root123

# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

# name of the mysql database where maxwell keeps its own state
#schema_database=maxwell

# whether to use GTID or not for positioning
#gtid_mode=true

# SSL/TLS options
# To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts:
#   -Djavax.net.ssl.trustStore=<truststore> -Djavax.net.ssl.trustStorePassword=<password>
# or import the MySQL cert into the global Java cacerts.
# MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY
#
# turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified
#ssl=DISABLED
# for binlog-connector
#replication_ssl=DISABLED
# for the schema-capture connection, if used
#schema_ssl=DISABLED

# maxwell can optionally replicate from a different server than where it stores
# schema and binlog position info.  Specify that different server here:

# 如果指定了 replication_host,那么它是真正的binlog来源的mysql server地址
#replication_host=other
#replication_user=username
#replication_password=password
#replication_port=3306

# This may be useful when using MaxScale's binlog mirroring host.
# Specifies that Maxwell should capture schema from a different server than
# it replicates from:

# 从哪个host获取表结构
#schema_host=other
#schema_user=username
#schema_password=password
#schema_port=3306

#       *** output format ***

# records include binlog position (default false)
#output_binlog_position=true

# records include a gtid string (default false)
#output_gtid_position=true

# records include fields with null values (default true).  If this is false,
# fields where the value is null will be omitted entirely from output.
#output_nulls=true

# records include server_id (default false)
#output_server_id=true

# records include thread_id (default false)
#output_thread_id=true

# records include schema_id (default false)
#output_schema_id=true

# records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false)
# 是否包含 INSERT/UPDATE/DELETE 语句. Mysql需要开启 binlog_rows_query_log_events
output_row_query=true

# DML records include list of values that make up a row's primary key (default false)
#output_primary_keys=true

# DML records include list of columns that make up a row's primary key (default false)
#output_primary_key_columns=true

# records include commit and xid (default true)
#output_commit_info=true

# This controls whether maxwell will output JSON information containing
# DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
# See also: ddl_kafka_topic
# 是否包含 DDL (table-alter, table-create, etc) events
output_ddl=true

# turns underscore naming style of fields to camel case style in JSON output
# default is none, which means the field name in JSON is the exact name in MySQL table
#output_naming_strategy=underscore_to_camelcase

#       *** kafka ***

# list of kafka brokers
kafka.bootstrap.servers=192.168.109.55:9092

# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell_test

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

# extra kafka options.  Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.

# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
#kafka.batch.size=16384


# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#

# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from.  The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0


#           *** partitioning ***

# What part of the data do we partition by?
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]

# specify what fields to partition by when using producer_partition_by=column
# column separated list.
#producer_partition_columns=id,foo,bar

# when using producer_partition_by=column, partition by this when
# the specified column(s) don't exist.
#producer_partition_by_fallback=database

#            *** kinesis ***

#kinesis_stream=maxwell

# AWS places a 256 unicode character limit on the max key length of a record
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
#
# Setting this option to true enables hashing the key with the md5 algorithm
# before we send it to kinesis so all the keys work within the key size limit.
# Values: true, false
# Default: false
#kinesis_md5_keys=true

#            *** sqs ***

#sqs_queue_uri=aws_sqs_queue_uri

# The sqs producer will need aws credentials configured in the default
# root folder and file format. Please check below link on how to do it.
# http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html

#            *** pub/sub ***

#pubsub_project_id=maxwell
#pubsub_topic=maxwell
#ddl_pubsub_topic=maxwell_ddl

#            *** rabbit-mq ***

#rabbitmq_host=rabbitmq_hostname
#rabbitmq_port=5672
#rabbitmq_user=guest
#rabbitmq_pass=guest
#rabbitmq_virtual_host=/
#rabbitmq_exchange=maxwell
#rabbitmq_exchange_type=fanout
#rabbitmq_exchange_durable=false
#rabbitmq_exchange_autodelete=false
#rabbitmq_routing_key_template=%db%.%table%
#rabbitmq_message_persistent=false
#rabbitmq_declare_exchange=true

#           *** redis ***

#redis_host=redis_host
#redis_port=6379
#redis_auth=redis_auth
#redis_database=0

# name of pubsub/list/whatever key to publish to
#redis_key=maxwell

# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_pub_channel=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_list_key=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# Valid values for redis_type = pubsub|lpush. Defaults to pubsub

#redis_type=pubsub

#           *** custom producer ***

# the fully qualified class name for custom ProducerFactory
# see the following link for more details.
# http://maxwells-daemon.io/producers/#custom-producer
#custom_producer.factory=

# custom producer properties can be configured using the custom_producer.* property namespace
#custom_producer.custom_prop=foo

#          *** filtering ***

# filter rows out of Maxwell's output.  Command separated list of filter-rules, evaluated in sequence.
# A filter rule is:
#  <type> ":" <db> "." <tbl> [ "." <col> "=" <col_val> ]
#  type    ::= [ "include" | "exclude" | "blacklist" ]
#  db      ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#  col_val ::= "column_name"
#  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#
# See http://maxwells-daemon.io/filtering for more details
#
#filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match"

# javascript filter
# maxwell can run a bit of javascript for each row if you need very custom filtering/data munging.
# See http://maxwells-daemon.io/filtering/#javascript_filters for more details
#
#javascript=/path/to/javascript_filter_file

#       *** encryption ***

# Encryption mode. Possible values are none, data, and all. (default none)
#encrypt=none

# Specify the secret key to be used
#secret_key=RandomInitVector

#       *** monitoring ***

metrics_type=http
metrics_prefix=metrics
# Maxwell collects metrics via dropwizard. These can be exposed through the
# base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog.
# Options: [jmx, slf4j, http, datadog]
# Supplying multiple is allowed.
#metrics_type=jmx,slf4j,http

# The prefix maxwell will apply to all metrics
#metrics_prefix=MaxwellMetrics # default MaxwellMetrics

# Enable (dropwizard) JVM metrics, default false
metrics_jvm=true

# When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds
#metrics_slf4j_interval=60

# When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
http_port=8080

# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
http_path_prefix=/

# ** The following are Datadog specific. **
# When metrics_type includes datadog this is the way metrics will be reported.
# Options: [udp, http]
# Supplying multiple is not allowed.
#metrics_datadog_type=http

# datadog tags that should be supplied
#metrics_datadog_tags=tag1:value1,tag2:value2

# The frequency metrics are pushed to datadog, in seconds
#metrics_datadog_interval=60

# required if metrics_datadog_type = http
#metrics_datadog_apikey=API_KEY

# required if metrics_datadog_type = udp
#metrics_datadog_host=localhost # default localhost
#metrics_datadog_port=8125 # default 8125

# Maxwell exposes http diagnostic endpoint to check below in parallel:
# 1. binlog replication lag
# 2. producer (currently kafka) lag

# To enable Maxwell diagnostic
# default false
#http_diagnostic=true 

# Diagnostic check timeout in milliseconds, required if diagnostic = true
# default 10000
#http_diagnostic_timeout=10000 

#    *** misc ***

# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic.  Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions).  sync mode is safer but you
# have to stop replication.
#bootstrapper=async [sync, async, none]

# output filename when using the "file" producer
output_file=/home/collie/lizj/data/mysql_binlog_data.log

3、启动

maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties
加--daemon转为后台进程:
maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties --daemon

producer

stdout

json包含binlog位点,包含ddl语句
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --output_binlog_position=true --output_ddl=true --producer=stdout
bin/maxwell --config conf/prod.maxwell.properties

#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=stdout

file

bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
bin/maxwell --config conf/prod.maxwell.properties

#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=file
# output filename when using the "file" producer
output_file=/home/collie/lizj/data/mysql_binlog_data.log

kafka

bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
bin/maxwell --config conf/prod.maxwell.properties

#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#       *** kafka ***

# list of kafka brokers
kafka.bootstrap.servers=192.168.109.55:9092

# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell_test

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
ddl_kafka_topic=maxwell_test_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

# extra kafka options.  Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.

# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
#kafka.batch.size=16384


# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#

# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from.  The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0

输出分析

{"database":"ic","table":"test","type":"update","ts":1631520880,"xid":1466241500,"commit":true,"data":{"id":3,"platform_name":"test","goods_id":"12354631","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:14:40"},"old":{"goods_id":"123564631","lastupdatetime":"2021-09-13 08:12:56"}}

{"type":"table-alter","database":"ic","table":"test","old":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"}],"primary-key":["id"]},"def":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"},{"type":"varchar","name":"test1","charset":"utf8"}],"primary-key":["id"]},"ts":1631522666000,"sql":"/* ApplicationName=PyCharm 2021.1.3 */ alter table test add column test1 varchar(64) default 'true'"}

{"type":"table-drop","database":"ic","table":"test1","ts":1631520801000,"sql":"DROP TABLE `test1` /* generated by server */"}

{"database":"ic","table":"test","type":"insert","ts":1631520701,"xid":1466240458,"commit":true,"data":{"id":9,"platform_name":"das_sad","goods_id":"121","create_time":"2021-09-13 08:11:41","lastupdatetime":"2021-09-13 08:11:41"}}

{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ update test set goods_id = 1234151 where id = 5","type":"update","ts":1631522576,"xid":1466250035,"commit":true,"data":{"id":5,"platform_name":"test","goods_id":"1234151","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:42:56","test":"1"},"old":{"goods_id":"12345151","lastupdatetime":"2021-09-13 08:28:29"}}

{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":0,"data":{"id":17,"platform_name":"das6","goods_id":"16566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":1,"data":{"id":18,"platform_name":"das5","goods_id":"15566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":2,"data":{"id":19,"platform_name":"das4","goods_id":"14566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":3,"data":{"id":20,"platform_name":"das3","goods_id":"13566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":4,"data":{"id":21,"platform_name":"das2","goods_id":"12566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"commit":true,"data":{"id":22,"platform_name":"das1","goods_id":"11566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}



字段说明:
database:库
table:表
data:最新的数据,修改后的数据
old:修改前的数据
ts:操作的时间戳
sql/query:sql语句
type:操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
xid:事务id,批量插入或删除时,每条单独输出,通过相同的事务id、不同的事务offset联系,最后一条commit
xoffset:事务offset
commit:同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
maxwell支持多种编码,但仅输出utf8编码

maxwell解析:

角色区分

--host,指定表结构来源的主机,即建maxwell库的主机
--replication_host,指定binlog来源的主机,用于master-slave结构中,不能在slave中建maxwell库的情况
--schema_host,指定表结构的主机,仅用于复制代理的情况
注:bootstrapping不适用于host和replication分离的情形

多实例

若需要运行多个Maxwell,需要为每个实例配置不同的client_id,以存储不同的binlog位点;同时,还需要配置replica_server_id,指定一个不同的server_id,如
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=maxwell --replica_server_id=6379 #默认
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=my --replica_server_id=2345

过滤:

#仅匹配foodb数据库的tbl表和所有table_数字的表
--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
#排除所有库所有表,仅匹配db1数据库
--filter = 'exclude: *.*, include: db1.*'
#排除含db.tbl.col列值为reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
#排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
#完全排除bad_db数据库,若要恢复,必须删除maxwell库
--filter = 'blacklist: bad_db.*

bootstrap功能

使用maxwell-bootstrap时,可用的参数有
--log_level,日志级别
--user,用户名
--password,密码
--host,主机
--port,端口
--database,包含需要bootstrap的表的数据库名
--table,需要bootstrap的表名
--where,限制条件
--client_id,maxwell实例名称
或者,也可以直接在maxwell.bootstrap表中手动添加触发同步
insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');

示例:
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
注意--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;--bootstrapper=async时,不会阻塞
json中的type
bootstrap-start -> bootstrap-insert -> bootstrap-complete  #其中,start和complete的data字段为空,不携带数据
在进行bootstrap过程中,若maxwell崩溃,重启时,bootstrap会完全重新开始,不管之前进行到多少,若不希望,可以设置complete字段值为1(完成),或者删除该行

监控/指标

所有指标目前都是针对kafka的
当启动HTTP节点后,会有以下路径
/metrics,返回json格式的指标数据
/healthcheck,健康检查,如果大于0说明在过去15min内有错误
/ping,返回pong
导出JMX信息,需要预先设置JAVA_OPTS

常见错误解决

1. ERROR Maxwell - Maxwell couldn't find the requested binlog, exiting...
原因:一般出现在关闭maxwell一段时间后启动maxwell时,由于记录的binlog文件已被自动删除;有时当很大批量的binlog日志写入时maxwell会缓存下来处理不过来的binlog至文件系统,而当其处理完文件系统的缓存后由于binlog存储空间不足记录的原始位置的binlog文件已被删除,也会出现这个错误
处理方法:调整binlog的存储空间和删除模式,手动修改maxwell的元数据库中的positions表中对应client_id的binlog_file和binlog_position的值
update maxwell.positions set binlog_file = 'mysql-bin.005302',binlog_position = 0 where client_id = 'bdp-rds-003.mysql.rds.aliyuncs.com';

2. java.lang.RuntimeException: Couldn't find database mysql
原因:账户没有mysql库的访问权限,上游修改了mysql库的user表
处理方法:最优方法是给maxwell使用的用户对应的权限;临时的方法是在maxwell的元数据库中的tables表中插入对应user表的信息,注意对应的database_id
INSERT INTO maxwell.tables (id, schema_id, database_id, name, charset, pk) VALUES (727, 1092, 54, 'user', 'utf8', 'Host,User');

3. com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
原因:reset master或者binlog文件更名导致找不到记录的binlog文件等
处理方法:手动修正positions表中对应实例的binlog信息,或者清空/删除maxwell库重建

附:完整参数

参数 值类型 描述 默认值
全局参数
config 字符串 指定config.properties配置文件路径 当前目录$PWD
log_level debug,info,warn,error 日志级别 info
daemon maxwell作为守护进程运行
env_config_prefix 字符串 作为配置项对待的环境变量前缀,如FOO_
mysql参数
host 字符串 mysql host localhost
user 字符串 mysql username
password 字符串 mysql password (no password)
port INT mysql port 3306
jdbc_options 字符串 mysql jdbc 连接串 DEFAULT_JDBC*
ssl SSL_OPT SSL启动 DISABLED
schema_database 字符串 存储表结构和binlog位点的数据库名称 maxwell
client_id 字符串 maxwell实例唯一定义文本 maxwell
replica_server_id LONG maxwell实例唯一定义编号,对应于MySQL server_id 6379
master_recovery 布尔值 启用实验性master恢复代码 FALSE
gtid_mode 布尔值 启动基于GTID的复制 FALSE
recapture_schema 布尔值 重新获得最新表结构. 配置文件中不适用 FALSE
replication参数,用于master-slave的架构,在slave上不方便创建maxwell库,从slave获取binlog
replication_host 字符串 主机 schema-store host
replication_password 字符串 密码 (none)
replication_port INT 端口 3306
replication_user 字符串 用户
replication_ssl SSL_OPT* 启用SSL DISABLED
schema参数,仅用于复制代理的情形,获取表结构
schema_host 字符串 主机 schema-store host
schema_password 字符串 密码 (none)
schema_port INT 端口 3306
schema_user 字符串 用户
schema_ssl SSL_OPT* 启用SSL DISABLED
producer参数
producer PRODUCER_TYPE* 消费者类型 stdout
custom_producer.factory CLASS_NAME 自定义消费者的工厂类
producer_ack_timeout 异步消费认为消息丢失的超时时间,毫秒ms
producer_partition_by PARTITION_BY* 输入到kafka/kinesis的分区函数 database
producer_partition_columns 字符串 若按列分区,以逗号分隔的列名称
producer_partition_by_fallback PARTITION_BY_F* 按列分区时需要,当列不存在是使用
ignore_producer_error 布尔值 为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 TRUE
producer=file
output_file 字符串 输出的文件路径
javascript 字符串 指定javascript过滤器文件
producer=kafka
kafka.bootstrap.servers 字符串 kafka brokers, 格式 HOST:PORT[,HOST:PORT]
kafka_topic 字符串 kafka topic maxwell
kafka_version KAFKA_VERSION* kafka版本. 配置文件config.properties中不适用 0.11.0.1
kafka_partition_hash default,murmur3 kafka分区hash函数 default
kafka_key_format array,hash maxwell输出kafka keys的格式 hash
ddl_kafka_topic 字符串 若output_ddl为true, kafka topic kafka_topic
producer=kinesis
kinesis_stream 字符串 kinesis stream name
producer=sqs
sqs_queue_uri 字符串 SQS Queue URI
producer=pubsub
pubsub_topic 字符串 Google Cloud pub-sub topic
pubsub_platform_id 字符串 Google Cloud platform id associated with topic
ddl_pubsub_topic 字符串 Google Cloud pub-sub topic to send DDL events to
producer=rabbitmq
rabbitmq_user 字符串 用户名 guest
rabbitmq_pass 字符串 密码 guest
rabbitmq_host 字符串 主机
rabbitmq_port INT 端口
rabbitmq_virtual_host 字符串 虚拟主机
rabbitmq_exchange 字符串 交换器名称
rabbitmq_exchange_type 字符串 交换器类型
rabbitmq_exchange_durable 布尔值 交换器是否持久化 FALSE
rabbitmq_exchange_autodelete 布尔值 为true时,当所有队列都完成时,删除交换器 FALSE
rabbitmq_routing_key_template 字符串 路由键模板,可用 %db% 和 %table% 作为变量 %db%.%table%
rabbitmq_message_persistent 布尔值 启用消息持久化 FALSE
rabbitmq_declare_exchange 布尔值 需要声明交换器 TRUE
producer=redis
redis_host 字符串 主机 localhost
redis_port INT 端口 6379
redis_auth 字符串 密码
redis_database INT 数据库[0-15] 0
redis_pub_channel 字符串 Pub/Sub模式的chanal名 maxwell
redis_list_key 字符串 LPUSH模式的列表名 maxwell
redis_type pubsub,lpush 模式选择 pubsub
formatting格式化
output_binlog_position 布尔值 包含binlog位点 FALSE
output_gtid_position 布尔值 包含gtid位点 FALSE
output_commit_info 布尔值 包含commit和xid TRUE
output_xoffset 布尔值 包含虚拟的行偏移 FALSE
output_nulls 布尔值 包含NULL值 TRUE
output_server_id 布尔值 包含server_id FALSE
output_thread_id 布尔值 包含thread_id FALSE
output_row_query 布尔值 包含INSERT/UPDATE/DELETE语句. Mysql需要开启binlog_rows_query_log_events FALSE
output_ddl 布尔值 包含DDL (table-alter, table-create, etc)事件 FALSE
filtering过滤器
filter 字符串 过滤规则,如 exclude: db.*, include: *.tbl, include: *./bar(bar)?/, exclude: foo.bar.col=val
encryption加密
encrypt none,data,all 加密模式: none不加密,data仅加密data字段,all加密整个消息 none
secret_key 字符串 encryption key null
monitoring / metrics指标
metrics_prefix 字符串 指标前缀 MaxwellMetrics
metrics_type slf4j,jmx,http,datadog 指标类型
metrics_jvm 布尔值 启用jvm指标: memory usage, GC stats等 FALSE
metrics_slf4j_interval SECONDS slf4j的频率指标,秒 60
http_port INT http方式的端口 8080
http_path_prefix 字符串 http方式的路径前缀 /
http_bind_address 字符串 http绑定的地址 all addresses
http_diagnostic 布尔值 启用http诊断 FALSE
http_diagnostic_timeout MILLISECONDS http诊断响应超时 10000
metrics_datadog_type udp,http datadog类型 udp
metrics_datadog_tags 字符串 datadog tags如 tag1:value1,tag2:value2
metrics_datadog_interval INT datadog推的频率,秒 60
metrics_datadog_apikey 字符串 datadog api key仅当metrics_datadog_type = http
metrics_datadog_host 字符串 目标主机,仅当metrics_datadog_type = udp localhost
metrics_datadog_port INT 端口,仅当metrics_datadog_type = udp 8125
其它
bootstrapper async,sync,none bootstrapper类型 async
init_position FILE:POS[:HEARTBEAT] 从指定位点启动,配置文件中不适用
replay 布尔值 启用只读重放模式,不存储binlog位点或结构变更,配置文件中不适用

报警配置模板: 报警模板: 接口地址: http://10.8.6.45:9101/healthcheck status_code: 200 content: {'MaxwellHealth': {'healthy': True, 'duration': 0, 'timestamp': '2021-11-03T11:20:45.175+08:00'}} 监控频率: 实时监控 报警条件: 请求失败 报警群: "线上报警"群 报警模板 【报警】服务器gsxt已下线

接口地址: http://10.8.6.45:9101/healthcheck status_code: 200 content: {'MaxwellHealth': {'healthy': True, 'duration': 0, 'timestamp': '2021-11-03T11:20:45.175+08:00'}} 监控频率: 实时监控 报警条件: healthy值不为True(连续三次不为true) 报警群: "线上报警"群 报警模板 【报警】服务器gsxt健康状态出错

只需修改接口地址中的端口号和报警的服务器名

Clone repository
  • README
  • basic_guidelines
  • basic_guidelines
    • basic_guidelines
    • dev_guide
    • project_build
    • 开发流程
  • best_practice
  • best_practice
    • AlterTable
    • RDS
    • azkaban
    • create_table
    • design
    • elasticsearch
    • elasticsearch
      • ES运维
    • logstash
View All Pages