MySQL到Kafka的数据变更通知
以下步骤以docker容器的方式快速演示如何搭建MySQL的单向复制环境.
创建网络
docker network create dtle-net
创建源端 MySQL
docker run --name mysql-src -e MYSQL_ROOT_PASSWORD=pass -p 33061:3306 --network=dtle-net -d mysql:5.7 --gtid-mode=ON --enforce-gtid-consistency=1 --log-bin=bin --server-id=1
检查是否联通:
> mysql -h 127.0.0.1 -P 33061 -uroot -ppass -e "select @@version\G"
< *************************** 1. row ***************************
@@version: 5.7.23-log
创建源端表结构
> mysql -h 127.0.0.1 -P 33061 -uroot -ppass -e "CREATE DATABASE demo; CREATE TABLE demo.demo_tbl(a int primary key)"
创建目标端 Kafka
docker run --name kafka-zookeeper -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes --network=dtle-net -d bitnami/zookeeper
docker run --name kafka-dst -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=kafka-zookeeper:2181 -e ALLOW_PLAINTEXT_LISTENER=yes --network=dtle-net -d bitnami/kafka
检查是否联通:
> docker run -it --rm \
--network dtle-net \
-e KAFKA_ZOOKEEPER_CONNECT=kafka-zookeeper:2181 \
bitnami/kafka:latest kafka-topics.sh --list --zookeeper kafka-zookeeper:2181
< Welcome to the Bitnami kafka container
Subscribe to project updates by watching https://github.com/bitnami/bitnami-docker-kafka
Submit issues and feature requests at https://github.com/bitnami/bitnami-docker-kafka/issues
创建 dtle
docker run --name dtle-consul -p 8500:8500 --network=dtle-net -d consul:latest
docker run --name dtle -p 4646:4646 --network=dtle-net -d actiontech/dtle
检查是否正常:
> curl -XGET "127.0.0.1:4646/v1/nodes" -s | jq
< [{...}]
准备作业定义文件
准备文件job.json, 内容如下:
{
"Job": {
"ID": "dtle-demo",
"Datacenters": ["dc1"],
"TaskGroups": [{
"Name": "src",
"Tasks": [{
"Name": "src",
"Driver": "dtle",
"Config": {
"Gtid": "",
"ReplicateDoDb": [{
"TableSchema": "demo",
"Tables": [{
"TableName": "demo_tbl"
}]
}],
"SrcConnectionConfig": {
"Host": "mysql-src",
"Port": 3306,
"User": "root",
"Password": "pass"
},
"KafkaConfig": {
"Topic": "demo-topic",
"Brokers": ["kafka-dst:9092"],
"Converter": "json"
}
}
}]
}, {
"Name": "dest",
"Tasks": [{
"Name": "dest",
"Driver": "dtle",
"Config": {
"DestType": "kafka"
}
}]
}]
}
}
其中定义了:
- 源端 MySQL 的连接字符串
- 目标端 Kafka 的 broker 访问地址
- 要复制的表为
demo.demo_tbl
- GTID点位为空, 表示此复制是 全量+增量 的复制. 如只测试增量复制, 可指定合法的GTID
创建复制任务
> curl -XPOST "http://127.0.0.1:4646/v1/jobs" -d @job.json -s | jq
< {...}
查看作业状态:
> curl -XGET "127.0.0.1:4646/v1/job/dtle-demo" -s | jq '.Status'
< "running"
测试
在源端写入数据:
> mysql -h 127.0.0.1 -P 33061 -uroot -ppass -e "INSERT INTO demo.demo_tbl values(1)"
...
验证相关的topic存在:
> docker run -it --rm \
--network dtle-net \
-e KAFKA_ZOOKEEPER_CONNECT=kafka-zookeeper:2181 \
bitnami/kafka:latest kafka-topics.sh --list --zookeeper kafka-zookeeper:2181
< Welcome to the Bitnami kafka container
Subscribe to project updates by watching https://github.com/bitnami/bitnami-docker-kafka
Submit issues and feature requests at https://github.com/bitnami/bitnami-docker-kafka/issues
demo-topic.demo.demo_tbl
验证数据:
> docker run -it --rm \
--network dtle-net \
-e KAFKA_ZOOKEEPER_CONNECT=kafka-zookeeper:2181 \
bitnami/kafka:latest kafka-console-consumer.sh --bootstrap-server kafka-dst:9092 --topic demo-topic.demo.demo_tbl --from-beginning
< ...
{"schema":{"type":"struct","optional":false,"fields":[{"type":"struct","optional":true,"field":"before","fields":[{"type":"int32","optional":false,"field":"a"}],"name":"demo-topic.demo.demo_tbl.Value"},{"type":"struct","optional":true,"field":"after","fields":[{"type":"int32","optional":false,"field":"a"}],"name":"demo-topic.demo.demo_tbl.Value"},{"type":"struct","optional":false,"field":"source","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"name":"io.debezium.connector.mysql.Source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"name":"demo-topic.demo.demo_tbl.Envelope","version":1},"payload":{"before":null,"after":{"a":11},"source":{"version":"0.0.1","name":"demo-topic","server_id":0,"ts_sec":0,"gtid":null,"file":"","pos":0,"row":1,"snapshot":true,"thread":null,"db":"demo","table":"demo_tbl"},"op":"c","ts_ms":1539760682507}}
此时可在源端对表demo.demo_tbl
进行DDL/DML等各种操作, 查看目标端数据是否一致
关于Kafka的消息格式, 参看5.3 Kafka 消息格式