MySQL到Kafka的数据变更通知

以下步骤以docker容器的方式快速演示如何搭建MySQL的单向复制环境.

创建网络

docker network create dtle-net

创建源端 MySQL

docker run --name mysql-src -e MYSQL_ROOT_PASSWORD=pass -p 33061:3306 --network=dtle-net -d mysql:5.7 --gtid-mode=ON --enforce-gtid-consistency=1 --log-bin=bin --server-id=1

检查是否联通:

> mysql -h 127.0.0.1 -P 33061 -uroot -ppass -e "select @@version\G"
< *************************** 1. row ***************************
@@version: 5.7.23-log

创建源端表结构

> mysql -h 127.0.0.1 -P 33061 -uroot -ppass -e "CREATE DATABASE demo; CREATE TABLE demo.demo_tbl(a int primary key)"

创建目标端 Kafka

docker run --name kafka-zookeeper -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes --network=dtle-net -d bitnami/zookeeper
docker run --name kafka-dst -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=kafka-zookeeper:2181 -e ALLOW_PLAINTEXT_LISTENER=yes --network=dtle-net -d bitnami/kafka

检查是否联通:

> docker run -it --rm \
    --network dtle-net \
    -e KAFKA_ZOOKEEPER_CONNECT=kafka-zookeeper:2181 \
    bitnami/kafka:latest kafka-topics.sh --list  --zookeeper kafka-zookeeper:2181
< Welcome to the Bitnami kafka container
Subscribe to project updates by watching https://github.com/bitnami/bitnami-docker-kafka
Submit issues and feature requests at https://github.com/bitnami/bitnami-docker-kafka/issues

创建 dtle

docker run --name dtle-consul -p 8500:8500 --network=dtle-net -d consul:latest
docker run --name dtle -p 4646:4646 --network=dtle-net -d actiontech/dtle

检查是否正常:

> curl -XGET "127.0.0.1:4646/v1/nodes" -s | jq
< [{...}]

准备作业定义文件

准备文件job.json, 内容如下:

{
  "Job": {
    "ID": "dtle-demo",
    "Datacenters": ["dc1"],
    "TaskGroups": [{
        "Name": "src",
        "Tasks": [{
          "Name": "src",
          "Driver": "dtle",
          "Config": {
            "Gtid": "",
            "ReplicateDoDb": [{
              "TableSchema": "demo",
              "Tables": [{
                "TableName": "demo_tbl"
              }]
            }],
            "SrcConnectionConfig": {
              "Host": "mysql-src",
              "Port": 3306,
              "User": "root",
              "Password": "pass"
            },
            "KafkaConfig": {
              "Topic": "demo-topic",
              "Brokers": ["kafka-dst:9092"],
              "Converter": "json"
            }
          }
        }]
      }, {
        "Name": "dest",
        "Tasks": [{
          "Name": "dest",
          "Driver": "dtle",
          "Config": {
            "DestType": "kafka"
          }
        }]
    }]
  }
}

其中定义了:

  • 源端 MySQL 的连接字符串
  • 目标端 Kafka 的 broker 访问地址
  • 要复制的表为demo.demo_tbl
  • GTID点位为空, 表示此复制是 全量+增量 的复制. 如只测试增量复制, 可指定合法的GTID

创建复制任务

> curl -XPOST "http://127.0.0.1:4646/v1/jobs" -d @job.json -s | jq
< {...}

查看作业状态:

> curl -XGET "127.0.0.1:4646/v1/job/dtle-demo" -s | jq '.Status'
< "running"

测试

在源端写入数据:

> mysql -h 127.0.0.1 -P 33061 -uroot -ppass -e "INSERT INTO demo.demo_tbl values(1)"
...

验证相关的topic存在:

> docker run -it --rm \
    --network dtle-net \
    -e KAFKA_ZOOKEEPER_CONNECT=kafka-zookeeper:2181 \
    bitnami/kafka:latest kafka-topics.sh --list  --zookeeper kafka-zookeeper:2181
< Welcome to the Bitnami kafka container
Subscribe to project updates by watching https://github.com/bitnami/bitnami-docker-kafka
Submit issues and feature requests at https://github.com/bitnami/bitnami-docker-kafka/issues

demo-topic.demo.demo_tbl

验证数据:

> docker run -it --rm \
    --network dtle-net \
    -e KAFKA_ZOOKEEPER_CONNECT=kafka-zookeeper:2181 \
    bitnami/kafka:latest kafka-console-consumer.sh --bootstrap-server kafka-dst:9092 --topic demo-topic.demo.demo_tbl --from-beginning
< ...
{"schema":{"type":"struct","optional":false,"fields":[{"type":"struct","optional":true,"field":"before","fields":[{"type":"int32","optional":false,"field":"a"}],"name":"demo-topic.demo.demo_tbl.Value"},{"type":"struct","optional":true,"field":"after","fields":[{"type":"int32","optional":false,"field":"a"}],"name":"demo-topic.demo.demo_tbl.Value"},{"type":"struct","optional":false,"field":"source","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"name":"io.debezium.connector.mysql.Source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"name":"demo-topic.demo.demo_tbl.Envelope","version":1},"payload":{"before":null,"after":{"a":11},"source":{"version":"0.0.1","name":"demo-topic","server_id":0,"ts_sec":0,"gtid":null,"file":"","pos":0,"row":1,"snapshot":true,"thread":null,"db":"demo","table":"demo_tbl"},"op":"c","ts_ms":1539760682507}}

此时可在源端对表demo.demo_tbl进行DDL/DML等各种操作, 查看目标端数据是否一致

关于Kafka的消息格式, 参看5.3 Kafka 消息格式

results matching ""

    No results matching ""