|
|
## 前置条件
|
|
|
|
|
|
### 1、MySQL
|
|
|
|
|
|
```shell
|
|
|
MySQL需要开启binlog功能,binlog格式需为row,要设置一个唯一的server_id,binlog_row_image应该为FULL(默认就是FULL)
|
|
|
vi /etc/my.cnf #仅列相关配置项
|
|
|
[mysqld]
|
|
|
server_id=1234
|
|
|
log-bin=bin-log
|
|
|
binlog_format=row
|
|
|
#binlog_row_image=FULL
|
|
|
|
|
|
GTID模式:
|
|
|
vi /etc/my.cnf #仅列相关配置项
|
|
|
[mysqld]
|
|
|
server_id=1234
|
|
|
log-bin=bin-log
|
|
|
binlog_format=row
|
|
|
#binlog_row_image=FULL
|
|
|
gtid-mode=ON
|
|
|
log-slave-updates=ON
|
|
|
enforce-gtid-consistency=true
|
|
|
```
|
|
|
|
|
|
### 2、Maxwell库
|
|
|
|
|
|
```sql
|
|
|
Maxwell需要建一个数据库,用于存放状态信息,默认库名为maxwell,该库在maxwell启动时会自动创建,但是需要为其设置相应权限;maxwell运行时需要获取复制状态信息,以及在information_schema中获取数据库及字段信息,所以还需要授予相应权限。如果使用root用户,可自行创建名为maxwell的库。
|
|
|
CREATE USER 'maxwell'@'%' identified by 'XXXXXX';
|
|
|
GRANT ALL on maxwell.* to 'maxwell'@'%' ;
|
|
|
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
|
|
|
FLUSH PRIVILEGES;
|
|
|
```
|
|
|
|
|
|
### 3、java
|
|
|
|
|
|
```
|
|
|
maxwell运行需要java
|
|
|
```
|
|
|
|
|
|
## 简单测试
|
|
|
|
|
|
### 1、安装
|
|
|
|
|
|
```shell
|
|
|
wget https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
|
|
|
tar zxvf maxwell-1.27.1.tar.gz
|
|
|
mv maxwell-1.27.1/ maxwell
|
|
|
```
|
|
|
|
|
|
### 2、配置
|
|
|
|
|
|
```shell
|
|
|
mkdir -p maxwell/conf
|
|
|
cd maxwell/conf
|
|
|
cp ../maxwell/config.properties.example ./prod.maxwell.properties
|
|
|
vim prod.maxwell.properties
|
|
|
|
|
|
常用配置:
|
|
|
config config.properties, 配置文件
|
|
|
log_level info,日志级别
|
|
|
daemon, 守护进程
|
|
|
schema_database maxwell, 数据库
|
|
|
client_id maxwell, maxwell实例名称
|
|
|
replica_server_id 6379, 类似于server_id,多实例
|
|
|
filter, 过滤器
|
|
|
output_binlog_position true, 记录binlog位点
|
|
|
output_ddl true, 记录ddl变更
|
|
|
配置优先级:
|
|
|
命令行 -> 环境变量 -> 配置文件 -> 默认值
|
|
|
```
|
|
|
|
|
|
prod.maxwell.properties(启动时指定的参数配置文件,也可直接maxwell命令后接各参数启动)
|
|
|
|
|
|
```properties
|
|
|
# tl;dr config
|
|
|
log_level=info
|
|
|
|
|
|
#producer=kafka
|
|
|
#kafka.bootstrap.servers=192.168.109.55:9092
|
|
|
#kafka_topic=prod.%{database}.%{table}
|
|
|
#kafka_topic=maxwell_test
|
|
|
|
|
|
# mysql login info
|
|
|
host=192.168.109.200
|
|
|
port=3801
|
|
|
user=root
|
|
|
password=root123
|
|
|
|
|
|
|
|
|
# *** general ***
|
|
|
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
|
|
|
producer=file
|
|
|
|
|
|
# set the log level. note that you can configure things further in log4j2.xml
|
|
|
# [DEBUG, INFO, WARN, ERROR]
|
|
|
log_level=INFO
|
|
|
|
|
|
# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
|
|
|
#env_config_prefix=/home/collie/lizj/maxwell/
|
|
|
|
|
|
# *** mysql ***
|
|
|
|
|
|
# mysql host to connect to
|
|
|
host=192.168.109.200
|
|
|
|
|
|
# mysql port to connect to
|
|
|
port=3801
|
|
|
|
|
|
# mysql user to connect as. This user must have REPLICATION SLAVE permissions,
|
|
|
# as well as full access to the `maxwell` (or schema_database) database
|
|
|
user=root
|
|
|
|
|
|
# mysql password
|
|
|
password=root123
|
|
|
|
|
|
# options to pass into the jdbc connection, given as opt=val&opt2=val2
|
|
|
#jdbc_options=opt1=100&opt2=hello
|
|
|
|
|
|
# name of the mysql database where maxwell keeps its own state
|
|
|
#schema_database=maxwell
|
|
|
|
|
|
# whether to use GTID or not for positioning
|
|
|
#gtid_mode=true
|
|
|
|
|
|
# SSL/TLS options
|
|
|
# To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts:
|
|
|
# -Djavax.net.ssl.trustStore=<truststore> -Djavax.net.ssl.trustStorePassword=<password>
|
|
|
# or import the MySQL cert into the global Java cacerts.
|
|
|
# MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY
|
|
|
#
|
|
|
# turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified
|
|
|
#ssl=DISABLED
|
|
|
# for binlog-connector
|
|
|
#replication_ssl=DISABLED
|
|
|
# for the schema-capture connection, if used
|
|
|
#schema_ssl=DISABLED
|
|
|
|
|
|
# maxwell can optionally replicate from a different server than where it stores
|
|
|
# schema and binlog position info. Specify that different server here:
|
|
|
|
|
|
# 如果指定了 replication_host,那么它是真正的binlog来源的mysql server地址
|
|
|
#replication_host=other
|
|
|
#replication_user=username
|
|
|
#replication_password=password
|
|
|
#replication_port=3306
|
|
|
|
|
|
# This may be useful when using MaxScale's binlog mirroring host.
|
|
|
# Specifies that Maxwell should capture schema from a different server than
|
|
|
# it replicates from:
|
|
|
|
|
|
# 从哪个host获取表结构
|
|
|
#schema_host=other
|
|
|
#schema_user=username
|
|
|
#schema_password=password
|
|
|
#schema_port=3306
|
|
|
|
|
|
# *** output format ***
|
|
|
|
|
|
# records include binlog position (default false)
|
|
|
#output_binlog_position=true
|
|
|
|
|
|
# records include a gtid string (default false)
|
|
|
#output_gtid_position=true
|
|
|
|
|
|
# records include fields with null values (default true). If this is false,
|
|
|
# fields where the value is null will be omitted entirely from output.
|
|
|
#output_nulls=true
|
|
|
|
|
|
# records include server_id (default false)
|
|
|
#output_server_id=true
|
|
|
|
|
|
# records include thread_id (default false)
|
|
|
#output_thread_id=true
|
|
|
|
|
|
# records include schema_id (default false)
|
|
|
#output_schema_id=true
|
|
|
|
|
|
# records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false)
|
|
|
# 是否包含 INSERT/UPDATE/DELETE 语句. Mysql需要开启 binlog_rows_query_log_events
|
|
|
output_row_query=true
|
|
|
|
|
|
# DML records include list of values that make up a row's primary key (default false)
|
|
|
#output_primary_keys=true
|
|
|
|
|
|
# DML records include list of columns that make up a row's primary key (default false)
|
|
|
#output_primary_key_columns=true
|
|
|
|
|
|
# records include commit and xid (default true)
|
|
|
#output_commit_info=true
|
|
|
|
|
|
# This controls whether maxwell will output JSON information containing
|
|
|
# DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
|
|
|
# See also: ddl_kafka_topic
|
|
|
# 是否包含 DDL (table-alter, table-create, etc) events
|
|
|
output_ddl=true
|
|
|
|
|
|
# turns underscore naming style of fields to camel case style in JSON output
|
|
|
# default is none, which means the field name in JSON is the exact name in MySQL table
|
|
|
#output_naming_strategy=underscore_to_camelcase
|
|
|
|
|
|
# *** kafka ***
|
|
|
|
|
|
# list of kafka brokers
|
|
|
kafka.bootstrap.servers=192.168.109.55:9092
|
|
|
|
|
|
# kafka topic to write to
|
|
|
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
|
|
|
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
|
|
|
kafka_topic=maxwell_test
|
|
|
|
|
|
# alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic
|
|
|
#ddl_kafka_topic=maxwell_ddl
|
|
|
|
|
|
# hash function to use. "default" is just the JVM's 'hashCode' function.
|
|
|
#kafka_partition_hash=default # [default, murmur3]
|
|
|
|
|
|
# how maxwell writes its kafka key.
|
|
|
#
|
|
|
# 'hash' looks like:
|
|
|
# {"database":"test","table":"tickets","pk.id":10001}
|
|
|
#
|
|
|
# 'array' looks like:
|
|
|
# ["test","tickets",[{"id":10001}]]
|
|
|
#
|
|
|
# default: "hash"
|
|
|
#kafka_key_format=hash # [hash, array]
|
|
|
|
|
|
# extra kafka options. Anything prefixed "kafka." will get
|
|
|
# passed directly into the kafka-producer's config.
|
|
|
|
|
|
# a few defaults.
|
|
|
# These are 0.11-specific. They may or may not work with other versions.
|
|
|
kafka.compression.type=snappy
|
|
|
kafka.retries=5
|
|
|
kafka.acks=1
|
|
|
#kafka.batch.size=16384
|
|
|
|
|
|
|
|
|
# kafka+SSL example
|
|
|
# kafka.security.protocol=SSL
|
|
|
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
|
|
|
# kafka.ssl.truststore.password=test1234
|
|
|
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
|
|
|
# kafka.ssl.keystore.password=test1234
|
|
|
# kafka.ssl.key.password=test1234#
|
|
|
|
|
|
# controls a heuristic check that maxwell may use to detect messages that
|
|
|
# we never heard back from. The heuristic check looks for "stuck" messages, and
|
|
|
# will timeout maxwell after this many milliseconds.
|
|
|
#
|
|
|
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
|
|
|
# if you really want to get into it.
|
|
|
#producer_ack_timeout=120000 # default 0
|
|
|
|
|
|
|
|
|
# *** partitioning ***
|
|
|
|
|
|
# What part of the data do we partition by?
|
|
|
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
|
|
|
|
|
|
# specify what fields to partition by when using producer_partition_by=column
|
|
|
# column separated list.
|
|
|
#producer_partition_columns=id,foo,bar
|
|
|
|
|
|
# when using producer_partition_by=column, partition by this when
|
|
|
# the specified column(s) don't exist.
|
|
|
#producer_partition_by_fallback=database
|
|
|
|
|
|
# *** kinesis ***
|
|
|
|
|
|
#kinesis_stream=maxwell
|
|
|
|
|
|
# AWS places a 256 unicode character limit on the max key length of a record
|
|
|
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
|
|
|
#
|
|
|
# Setting this option to true enables hashing the key with the md5 algorithm
|
|
|
# before we send it to kinesis so all the keys work within the key size limit.
|
|
|
# Values: true, false
|
|
|
# Default: false
|
|
|
#kinesis_md5_keys=true
|
|
|
|
|
|
# *** sqs ***
|
|
|
|
|
|
#sqs_queue_uri=aws_sqs_queue_uri
|
|
|
|
|
|
# The sqs producer will need aws credentials configured in the default
|
|
|
# root folder and file format. Please check below link on how to do it.
|
|
|
# http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html
|
|
|
|
|
|
# *** pub/sub ***
|
|
|
|
|
|
#pubsub_project_id=maxwell
|
|
|
#pubsub_topic=maxwell
|
|
|
#ddl_pubsub_topic=maxwell_ddl
|
|
|
|
|
|
# *** rabbit-mq ***
|
|
|
|
|
|
#rabbitmq_host=rabbitmq_hostname
|
|
|
#rabbitmq_port=5672
|
|
|
#rabbitmq_user=guest
|
|
|
#rabbitmq_pass=guest
|
|
|
#rabbitmq_virtual_host=/
|
|
|
#rabbitmq_exchange=maxwell
|
|
|
#rabbitmq_exchange_type=fanout
|
|
|
#rabbitmq_exchange_durable=false
|
|
|
#rabbitmq_exchange_autodelete=false
|
|
|
#rabbitmq_routing_key_template=%db%.%table%
|
|
|
#rabbitmq_message_persistent=false
|
|
|
#rabbitmq_declare_exchange=true
|
|
|
|
|
|
# *** redis ***
|
|
|
|
|
|
#redis_host=redis_host
|
|
|
#redis_port=6379
|
|
|
#redis_auth=redis_auth
|
|
|
#redis_database=0
|
|
|
|
|
|
# name of pubsub/list/whatever key to publish to
|
|
|
#redis_key=maxwell
|
|
|
|
|
|
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
|
|
|
#redis_pub_channel=maxwell
|
|
|
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
|
|
|
#redis_list_key=maxwell
|
|
|
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
|
|
|
# Valid values for redis_type = pubsub|lpush. Defaults to pubsub
|
|
|
|
|
|
#redis_type=pubsub
|
|
|
|
|
|
# *** custom producer ***
|
|
|
|
|
|
# the fully qualified class name for custom ProducerFactory
|
|
|
# see the following link for more details.
|
|
|
# http://maxwells-daemon.io/producers/#custom-producer
|
|
|
#custom_producer.factory=
|
|
|
|
|
|
# custom producer properties can be configured using the custom_producer.* property namespace
|
|
|
#custom_producer.custom_prop=foo
|
|
|
|
|
|
# *** filtering ***
|
|
|
|
|
|
# filter rows out of Maxwell's output. Command separated list of filter-rules, evaluated in sequence.
|
|
|
# A filter rule is:
|
|
|
# <type> ":" <db> "." <tbl> [ "." <col> "=" <col_val> ]
|
|
|
# type ::= [ "include" | "exclude" | "blacklist" ]
|
|
|
# db ::= [ "/regexp/" | "string" | "`string`" | "*" ]
|
|
|
# tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ]
|
|
|
# col_val ::= "column_name"
|
|
|
# tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ]
|
|
|
#
|
|
|
# See http://maxwells-daemon.io/filtering for more details
|
|
|
#
|
|
|
#filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match"
|
|
|
|
|
|
# javascript filter
|
|
|
# maxwell can run a bit of javascript for each row if you need very custom filtering/data munging.
|
|
|
# See http://maxwells-daemon.io/filtering/#javascript_filters for more details
|
|
|
#
|
|
|
#javascript=/path/to/javascript_filter_file
|
|
|
|
|
|
# *** encryption ***
|
|
|
|
|
|
# Encryption mode. Possible values are none, data, and all. (default none)
|
|
|
#encrypt=none
|
|
|
|
|
|
# Specify the secret key to be used
|
|
|
#secret_key=RandomInitVector
|
|
|
|
|
|
# *** monitoring ***
|
|
|
|
|
|
metrics_type=http
|
|
|
metrics_prefix=metrics
|
|
|
# Maxwell collects metrics via dropwizard. These can be exposed through the
|
|
|
# base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog.
|
|
|
# Options: [jmx, slf4j, http, datadog]
|
|
|
# Supplying multiple is allowed.
|
|
|
#metrics_type=jmx,slf4j,http
|
|
|
|
|
|
# The prefix maxwell will apply to all metrics
|
|
|
#metrics_prefix=MaxwellMetrics # default MaxwellMetrics
|
|
|
|
|
|
# Enable (dropwizard) JVM metrics, default false
|
|
|
metrics_jvm=true
|
|
|
|
|
|
# When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds
|
|
|
#metrics_slf4j_interval=60
|
|
|
|
|
|
# When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
|
|
|
http_port=8080
|
|
|
|
|
|
# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
|
|
|
http_path_prefix=/
|
|
|
|
|
|
# ** The following are Datadog specific. **
|
|
|
# When metrics_type includes datadog this is the way metrics will be reported.
|
|
|
# Options: [udp, http]
|
|
|
# Supplying multiple is not allowed.
|
|
|
#metrics_datadog_type=http
|
|
|
|
|
|
# datadog tags that should be supplied
|
|
|
#metrics_datadog_tags=tag1:value1,tag2:value2
|
|
|
|
|
|
# The frequency metrics are pushed to datadog, in seconds
|
|
|
#metrics_datadog_interval=60
|
|
|
|
|
|
# required if metrics_datadog_type = http
|
|
|
#metrics_datadog_apikey=API_KEY
|
|
|
|
|
|
# required if metrics_datadog_type = udp
|
|
|
#metrics_datadog_host=localhost # default localhost
|
|
|
#metrics_datadog_port=8125 # default 8125
|
|
|
|
|
|
# Maxwell exposes http diagnostic endpoint to check below in parallel:
|
|
|
# 1. binlog replication lag
|
|
|
# 2. producer (currently kafka) lag
|
|
|
|
|
|
# To enable Maxwell diagnostic
|
|
|
# default false
|
|
|
#http_diagnostic=true
|
|
|
|
|
|
# Diagnostic check timeout in milliseconds, required if diagnostic = true
|
|
|
# default 10000
|
|
|
#http_diagnostic_timeout=10000
|
|
|
|
|
|
# *** misc ***
|
|
|
|
|
|
# maxwell's bootstrapping functionality has a couple of modes.
|
|
|
#
|
|
|
# In "async" mode, maxwell will output the replication stream while it
|
|
|
# simultaneously outputs the database to the topic. Note that it won't
|
|
|
# output replication data for any tables it is currently bootstrapping -- this
|
|
|
# data will be buffered and output after the bootstrap is complete.
|
|
|
#
|
|
|
# In "sync" mode, maxwell stops the replication stream while it
|
|
|
# outputs bootstrap data.
|
|
|
#
|
|
|
# async mode keeps ops live while bootstrapping, but carries the possibility of
|
|
|
# data loss (due to buffering transactions). sync mode is safer but you
|
|
|
# have to stop replication.
|
|
|
#bootstrapper=async [sync, async, none]
|
|
|
|
|
|
# output filename when using the "file" producer
|
|
|
output_file=/home/collie/lizj/data/mysql_binlog_data.log
|
|
|
```
|
|
|
|
|
|
### 3、启动
|
|
|
|
|
|
```shell
|
|
|
maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties
|
|
|
加--daemon转为后台进程:
|
|
|
maxwell/bin/maxwell --config maxwell/conf/prod.maxwell.properties --daemon
|
|
|
```
|
|
|
|
|
|
## producer
|
|
|
|
|
|
### stdout
|
|
|
|
|
|
```shell
|
|
|
json包含binlog位点,包含ddl语句
|
|
|
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --output_binlog_position=true --output_ddl=true --producer=stdout
|
|
|
bin/maxwell --config conf/prod.maxwell.properties
|
|
|
|
|
|
# *** general ***
|
|
|
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
|
|
|
producer=stdout
|
|
|
```
|
|
|
|
|
|
### file
|
|
|
|
|
|
```shell
|
|
|
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
|
|
|
bin/maxwell --config conf/prod.maxwell.properties
|
|
|
|
|
|
# *** general ***
|
|
|
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
|
|
|
producer=file
|
|
|
# output filename when using the "file" producer
|
|
|
output_file=/home/collie/lizj/data/mysql_binlog_data.log
|
|
|
```
|
|
|
|
|
|
### kafka
|
|
|
|
|
|
```shell
|
|
|
bin/maxwell --user='root' --password='root123' --host=192.168.109.200 --port=3801 --producer=file --output_file=/home/collie/lizj/data/mysql_binlog_data.log
|
|
|
bin/maxwell --config conf/prod.maxwell.properties
|
|
|
|
|
|
# *** general ***
|
|
|
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
|
|
|
producer=kafka
|
|
|
# *** kafka ***
|
|
|
|
|
|
# list of kafka brokers
|
|
|
kafka.bootstrap.servers=192.168.109.55:9092
|
|
|
|
|
|
# kafka topic to write to
|
|
|
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
|
|
|
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
|
|
|
kafka_topic=maxwell_test
|
|
|
|
|
|
# alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic
|
|
|
ddl_kafka_topic=maxwell_test_ddl
|
|
|
|
|
|
# hash function to use. "default" is just the JVM's 'hashCode' function.
|
|
|
#kafka_partition_hash=default # [default, murmur3]
|
|
|
|
|
|
# how maxwell writes its kafka key.
|
|
|
#
|
|
|
# 'hash' looks like:
|
|
|
# {"database":"test","table":"tickets","pk.id":10001}
|
|
|
#
|
|
|
# 'array' looks like:
|
|
|
# ["test","tickets",[{"id":10001}]]
|
|
|
#
|
|
|
# default: "hash"
|
|
|
#kafka_key_format=hash # [hash, array]
|
|
|
|
|
|
# extra kafka options. Anything prefixed "kafka." will get
|
|
|
# passed directly into the kafka-producer's config.
|
|
|
|
|
|
# a few defaults.
|
|
|
# These are 0.11-specific. They may or may not work with other versions.
|
|
|
kafka.compression.type=snappy
|
|
|
kafka.retries=5
|
|
|
kafka.acks=1
|
|
|
#kafka.batch.size=16384
|
|
|
|
|
|
|
|
|
# kafka+SSL example
|
|
|
# kafka.security.protocol=SSL
|
|
|
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
|
|
|
# kafka.ssl.truststore.password=test1234
|
|
|
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
|
|
|
# kafka.ssl.keystore.password=test1234
|
|
|
# kafka.ssl.key.password=test1234#
|
|
|
|
|
|
# controls a heuristic check that maxwell may use to detect messages that
|
|
|
# we never heard back from. The heuristic check looks for "stuck" messages, and
|
|
|
# will timeout maxwell after this many milliseconds.
|
|
|
#
|
|
|
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
|
|
|
# if you really want to get into it.
|
|
|
#producer_ack_timeout=120000 # default 0
|
|
|
```
|
|
|
|
|
|
## 输出分析
|
|
|
|
|
|
```json
|
|
|
{"database":"ic","table":"test","type":"update","ts":1631520880,"xid":1466241500,"commit":true,"data":{"id":3,"platform_name":"test","goods_id":"12354631","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:14:40"},"old":{"goods_id":"123564631","lastupdatetime":"2021-09-13 08:12:56"}}
|
|
|
|
|
|
{"type":"table-alter","database":"ic","table":"test","old":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"}],"primary-key":["id"]},"def":{"database":"ic","charset":"utf8","table":"test","columns":[{"type":"int","name":"id","signed":true},{"type":"varchar","name":"platform_name","charset":"utf8"},{"type":"varchar","name":"goods_id","charset":"utf8"},{"type":"timestamp","name":"create_time","column-length":0},{"type":"timestamp","name":"lastupdatetime","column-length":0},{"type":"varchar","name":"test","charset":"utf8"},{"type":"varchar","name":"test1","charset":"utf8"}],"primary-key":["id"]},"ts":1631522666000,"sql":"/* ApplicationName=PyCharm 2021.1.3 */ alter table test add column test1 varchar(64) default 'true'"}
|
|
|
|
|
|
{"type":"table-drop","database":"ic","table":"test1","ts":1631520801000,"sql":"DROP TABLE `test1` /* generated by server */"}
|
|
|
|
|
|
{"database":"ic","table":"test","type":"insert","ts":1631520701,"xid":1466240458,"commit":true,"data":{"id":9,"platform_name":"das_sad","goods_id":"121","create_time":"2021-09-13 08:11:41","lastupdatetime":"2021-09-13 08:11:41"}}
|
|
|
|
|
|
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ update test set goods_id = 1234151 where id = 5","type":"update","ts":1631522576,"xid":1466250035,"commit":true,"data":{"id":5,"platform_name":"test","goods_id":"1234151","create_time":"2021-09-13 05:31:40","lastupdatetime":"2021-09-13 08:42:56","test":"1"},"old":{"goods_id":"12345151","lastupdatetime":"2021-09-13 08:28:29"}}
|
|
|
|
|
|
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":0,"data":{"id":17,"platform_name":"das6","goods_id":"16566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
|
|
|
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":1,"data":{"id":18,"platform_name":"das5","goods_id":"15566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
|
|
|
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":2,"data":{"id":19,"platform_name":"das4","goods_id":"14566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
|
|
|
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":3,"data":{"id":20,"platform_name":"das3","goods_id":"13566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
|
|
|
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"xoffset":4,"data":{"id":21,"platform_name":"das2","goods_id":"12566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
|
|
|
{"database":"ic","table":"test","query":"/* ApplicationName=PyCharm 2021.1.3 */ insert into test(platform_name,goods_id) values('das6','16566456'),('das5','15566456'),('das4','14566456'),('das3','13566456'),('das2','12566456'),('das1','11566456')","type":"insert","ts":1631523144,"xid":1466252166,"commit":true,"data":{"id":22,"platform_name":"das1","goods_id":"11566456","create_time":"2021-09-13 08:52:24","lastupdatetime":"2021-09-13 08:52:24","test":"1","test1":"true","test2":"true2","test3":"true"}}
|
|
|
|
|
|
|
|
|
|
|
|
字段说明:
|
|
|
database:库
|
|
|
table:表
|
|
|
data:最新的数据,修改后的数据
|
|
|
old:修改前的数据
|
|
|
ts:操作的时间戳
|
|
|
sql/query:sql语句
|
|
|
type:操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
|
|
|
xid:事务id,批量插入或删除时,每条单独输出,通过相同的事务id、不同的事务offset联系,最后一条commit
|
|
|
xoffset:事务offset
|
|
|
commit:同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
|
|
|
maxwell支持多种编码,但仅输出utf8编码
|
|
|
```
|
|
|
|
|
|
## maxwell解析:
|
|
|
|
|
|
### 角色区分
|
|
|
|
|
|
```
|
|
|
--host,指定表结构来源的主机,即建maxwell库的主机
|
|
|
--replication_host,指定binlog来源的主机,用于master-slave结构中,不能在slave中建maxwell库的情况
|
|
|
--schema_host,指定表结构的主机,仅用于复制代理的情况
|
|
|
注:bootstrapping不适用于host和replication分离的情形
|
|
|
```
|
|
|
|
|
|
### 多实例
|
|
|
|
|
|
```shell
|
|
|
若需要运行多个Maxwell,需要为每个实例配置不同的client_id,以存储不同的binlog位点;同时,还需要配置replica_server_id,指定一个不同的server_id,如
|
|
|
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=maxwell--replica_server_id=6379 #默认
|
|
|
./maxwell --user='canal' --password='canal' --host='127.0.0.1' --producer=stdout --client_id=my --replica_server_id=2345
|
|
|
```
|
|
|
|
|
|
### 过滤:
|
|
|
|
|
|
```shell
|
|
|
#仅匹配foodb数据库的tbl表和所有table_数字的表
|
|
|
--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
|
|
|
#排除所有库所有表,仅匹配db1数据库
|
|
|
--filter = 'exclude: *.*, include: db1.*'
|
|
|
#排除含db.tbl.col列值为reject的所有更新
|
|
|
--filter = 'exclude: db.tbl.col = reject'
|
|
|
#排除任何包含col_a列的更新
|
|
|
--filter = 'exclude: *.*.col_a = *'
|
|
|
#完全排除bad_db数据库,若要恢复,必须删除maxwell库
|
|
|
--filter = 'blacklist: bad_db.*
|
|
|
```
|
|
|
|
|
|
### bootstrap功能
|
|
|
|
|
|
```shell
|
|
|
使用maxwell-bootstrap时,可用的参数有
|
|
|
--log_level,日志级别
|
|
|
--user,用户名
|
|
|
--password,密码
|
|
|
--host,主机
|
|
|
--port,端口
|
|
|
--database,包含需要bootstrap的表的数据库名
|
|
|
--table,需要bootstrap的表名
|
|
|
--where,限制条件
|
|
|
--client_id,maxwell实例名称
|
|
|
或者,也可以直接在maxwell.bootstrap表中手动添加触发同步
|
|
|
insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');
|
|
|
|
|
|
示例:
|
|
|
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info
|
|
|
bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
|
|
|
注意--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;--bootstrapper=async时,不会阻塞
|
|
|
json中的type
|
|
|
bootstrap-start -> bootstrap-insert -> bootstrap-complete #其中,start和complete的data字段为空,不携带数据
|
|
|
在进行bootstrap过程中,若maxwell崩溃,重启时,bootstrap会完全重新开始,不管之前进行到多少,若不希望,可以设置complete字段值为1(完成),或者删除该行
|
|
|
```
|
|
|
|
|
|
### 监控/指标
|
|
|
|
|
|
```shell
|
|
|
所有指标目前都是针对kafka的
|
|
|
当启动HTTP节点后,会有以下路径
|
|
|
/metrics,返回json格式的指标数据
|
|
|
/healthcheck,健康检查,如果大于0说明在过去15min内有错误
|
|
|
/ping,返回pong
|
|
|
导出JMX信息,需要预先设置JAVA_OPTS
|
|
|
```
|
|
|
|
|
|
## 附:完整参数
|
|
|
|
|
|
| 参数 | 值类型 | 描述 | 默认值 |
|
|
|
| ------------------------------------------------------------ | ---------------------- | ------------------------------------------------------------ | ----------------- |
|
|
|
| **全局参数** | | | |
|
|
|
| config | 字符串 | 指定config.properties配置文件路径 | 当前目录$PWD |
|
|
|
| log_level | debug,info,warn,error | 日志级别 | info |
|
|
|
| daemon | | maxwell作为守护进程运行 | |
|
|
|
| env_config_prefix | 字符串 | 作为配置项对待的环境变量前缀,如FOO_ | |
|
|
|
| **mysql参数** | | | |
|
|
|
| host | 字符串 | mysql host | localhost |
|
|
|
| user | 字符串 | mysql username | |
|
|
|
| password | 字符串 | mysql password | (no password) |
|
|
|
| port | INT | mysql port | 3306 |
|
|
|
| jdbc_options | 字符串 | mysql jdbc 连接串 | DEFAULT_JDBC* |
|
|
|
| ssl | SSL_OPT | SSL启动 | DISABLED |
|
|
|
| schema_database | 字符串 | 存储表结构和binlog位点的数据库名称 | maxwell |
|
|
|
| client_id | 字符串 | maxwell实例唯一定义文本 | maxwell |
|
|
|
| replica_server_id | LONG | maxwell实例唯一定义编号,对应于MySQL server_id | 6379 |
|
|
|
| master_recovery | 布尔值 | 启用实验性master恢复代码 | FALSE |
|
|
|
| gtid_mode | 布尔值 | 启动基于GTID的复制 | FALSE |
|
|
|
| recapture_schema | 布尔值 | 重新获得最新表结构. 配置文件中不适用 | FALSE |
|
|
|
| **replication参数**,用于master-slave的架构,在slave上不方便创建maxwell库,从slave获取binlog | | | |
|
|
|
| replication_host | 字符串 | 主机 | schema-store host |
|
|
|
| replication_password | 字符串 | 密码 | (none) |
|
|
|
| replication_port | INT | 端口 | 3306 |
|
|
|
| replication_user | 字符串 | 用户 | |
|
|
|
| replication_ssl | SSL_OPT* | 启用SSL | DISABLED |
|
|
|
| **schema参数**,仅用于复制代理的情形,获取表结构 | | | |
|
|
|
| schema_host | 字符串 | 主机 | schema-store host |
|
|
|
| schema_password | 字符串 | 密码 | (none) |
|
|
|
| schema_port | INT | 端口 | 3306 |
|
|
|
| schema_user | 字符串 | 用户 | |
|
|
|
| schema_ssl | SSL_OPT* | 启用SSL | DISABLED |
|
|
|
| **producer参数** | | | |
|
|
|
| producer | PRODUCER_TYPE* | 消费者类型 | stdout |
|
|
|
| custom_producer.factory | CLASS_NAME | 自定义消费者的工厂类 | |
|
|
|
| producer_ack_timeout | | 异步消费认为消息丢失的超时时间,毫秒ms | |
|
|
|
| producer_partition_by | PARTITION_BY* | 输入到kafka/kinesis的分区函数 | database |
|
|
|
| producer_partition_columns | 字符串 | 若按列分区,以逗号分隔的列名称 | |
|
|
|
| producer_partition_by_fallback | PARTITION_BY_F* | 按列分区时需要,当列不存在是使用 | |
|
|
|
| ignore_producer_error | 布尔值 | 为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 | TRUE |
|
|
|
| **producer=file** | | | |
|
|
|
| output_file | 字符串 | 输出的文件路径 | |
|
|
|
| javascript | 字符串 | 指定javascript过滤器文件 | |
|
|
|
| **producer=kafka** | | | |
|
|
|
| kafka.bootstrap.servers | 字符串 | kafka brokers, 格式 HOST:PORT[,HOST:PORT] | |
|
|
|
| kafka_topic | 字符串 | kafka topic | maxwell |
|
|
|
| kafka_version | KAFKA_VERSION* | kafka版本. 配置文件config.properties中不适用 | 0.11.0.1 |
|
|
|
| kafka_partition_hash | default,murmur3 | kafka分区hash函数 | default |
|
|
|
| kafka_key_format | array,hash | maxwell输出kafka keys的格式 | hash |
|
|
|
| ddl_kafka_topic | 字符串 | 若output_ddl为true, kafka topic | kafka_topic |
|
|
|
| **producer=kinesis** | | | |
|
|
|
| kinesis_stream | 字符串 | kinesis stream name | |
|
|
|
| **producer=sqs** | | | |
|
|
|
| sqs_queue_uri | 字符串 | SQS Queue URI | |
|
|
|
| **producer=pubsub** | | | |
|
|
|
| pubsub_topic | 字符串 | Google Cloud pub-sub topic | |
|
|
|
| pubsub_platform_id | 字符串 | Google Cloud platform id associated with topic | |
|
|
|
| ddl_pubsub_topic | 字符串 | Google Cloud pub-sub topic to send DDL events to | |
|
|
|
| **producer=rabbitmq** | | | |
|
|
|
| rabbitmq_user | 字符串 | 用户名 | guest |
|
|
|
| rabbitmq_pass | 字符串 | 密码 | guest |
|
|
|
| rabbitmq_host | 字符串 | 主机 | |
|
|
|
| rabbitmq_port | INT | 端口 | |
|
|
|
| rabbitmq_virtual_host | 字符串 | 虚拟主机 | |
|
|
|
| rabbitmq_exchange | 字符串 | 交换器名称 | |
|
|
|
| rabbitmq_exchange_type | 字符串 | 交换器类型 | |
|
|
|
| rabbitmq_exchange_durable | 布尔值 | 交换器是否持久化 | FALSE |
|
|
|
| rabbitmq_exchange_autodelete | 布尔值 | 为true时,当所有队列都完成时,删除交换器 | FALSE |
|
|
|
| rabbitmq_routing_key_template | 字符串 | 路由键模板,可用 %db% 和 %table% 作为变量 | %db%.%table% |
|
|
|
| rabbitmq_message_persistent | 布尔值 | 启用消息持久化 | FALSE |
|
|
|
| rabbitmq_declare_exchange | 布尔值 | 需要声明交换器 | TRUE |
|
|
|
| **producer=redis** | | | |
|
|
|
| redis_host | 字符串 | 主机 | localhost |
|
|
|
| redis_port | INT | 端口 | 6379 |
|
|
|
| redis_auth | 字符串 | 密码 | |
|
|
|
| redis_database | INT | 数据库[0-15] | 0 |
|
|
|
| redis_pub_channel | 字符串 | Pub/Sub模式的chanal名 | maxwell |
|
|
|
| redis_list_key | 字符串 | LPUSH模式的列表名 | maxwell |
|
|
|
| redis_type | pubsub,lpush | 模式选择 | pubsub |
|
|
|
| **formatting格式化** | | | |
|
|
|
| output_binlog_position | 布尔值 | 包含binlog位点 | FALSE |
|
|
|
| output_gtid_position | 布尔值 | 包含gtid位点 | FALSE |
|
|
|
| output_commit_info | 布尔值 | 包含commit和xid | TRUE |
|
|
|
| output_xoffset | 布尔值 | 包含虚拟的行偏移 | FALSE |
|
|
|
| output_nulls | 布尔值 | 包含NULL值 | TRUE |
|
|
|
| output_server_id | 布尔值 | 包含server_id | FALSE |
|
|
|
| output_thread_id | 布尔值 | 包含thread_id | FALSE |
|
|
|
| output_row_query | 布尔值 | 包含INSERT/UPDATE/DELETE语句. Mysql需要开启binlog_rows_query_log_events | FALSE |
|
|
|
| output_ddl | 布尔值 | 包含DDL (table-alter, table-create, etc)事件 | FALSE |
|
|
|
| **filtering过滤器** | | | |
|
|
|
| filter | 字符串 | 过滤规则,如 exclude: db.*, include: *.tbl, include: *./bar(bar)?/, exclude: foo.bar.col=val | |
|
|
|
| **encryption加密** | | | |
|
|
|
| encrypt | none,data,all | 加密模式: none不加密,data仅加密data字段,all加密整个消息 | none |
|
|
|
| secret_key | 字符串 | encryption key | null |
|
|
|
| **monitoring / metrics指标** | | | |
|
|
|
| metrics_prefix | 字符串 | 指标前缀 | MaxwellMetrics |
|
|
|
| metrics_type | slf4j,jmx,http,datadog | 指标类型 | |
|
|
|
| metrics_jvm | 布尔值 | 启用jvm指标: memory usage, GC stats等 | FALSE |
|
|
|
| metrics_slf4j_interval | SECONDS | slf4j的频率指标,秒 | 60 |
|
|
|
| http_port | INT | http方式的端口 | 8080 |
|
|
|
| http_path_prefix | 字符串 | http方式的路径前缀 | / |
|
|
|
| http_bind_address | 字符串 | http绑定的地址 | all addresses |
|
|
|
| http_diagnostic | 布尔值 | 启用http诊断 | FALSE |
|
|
|
| http_diagnostic_timeout | MILLISECONDS | http诊断响应超时 | 10000 |
|
|
|
| metrics_datadog_type | udp,http | datadog类型 | udp |
|
|
|
| metrics_datadog_tags | 字符串 | datadog tags如 tag1:value1,tag2:value2 | |
|
|
|
| metrics_datadog_interval | INT | datadog推的频率,秒 | 60 |
|
|
|
| metrics_datadog_apikey | 字符串 | datadog api key仅当metrics_datadog_type = http | |
|
|
|
| metrics_datadog_host | 字符串 | 目标主机,仅当metrics_datadog_type = udp | localhost |
|
|
|
| metrics_datadog_port | INT | 端口,仅当metrics_datadog_type = udp | 8125 |
|
|
|
| **其它** | | | |
|
|
|
| bootstrapper | async,sync,none | bootstrapper类型 | async |
|
|
|
| init_position | FILE:POS[:HEARTBEAT] | 从指定位点启动,配置文件中不适用 | |
|
|
|
| replay | 布尔值 | 启用只读重放模式,不存储binlog位点或结构变更,配置文件中不适用 | | |
|
|
\ No newline at end of file |