Kafka 消息格式
dtle Kafka 输出, 消息格式兼容 Debezium
其消息格式具体可参考 https://debezium.io/documentation/reference/1.8/tutorial.html
此处概要说明
- 每行数据变更会有一个消息
- 每个消息分为key和value
- key是该次变更的主键
- value是该次变更的整行数据
- key和value各自又有schema和payload
- payload是具体的数据
- schema指明了数据的格式, 即payload的解读方式, 可以理解为“类定义”
- 注意和SQL schema含义不同
- 表结构会包含在 Kafka Connect schema 中
DML
Key
以下是一个消息的key. 只是简单的包含了主键.
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
Value
以下是一个消息的value, 其类型为 topic.schema.table.Envelope
, 拥有5个字段
before
, 复杂类型topic.schema.table.Value
, 为该表的表结构.after
, 复杂类型, 同上source
, 复杂类型, 为该次变更的元数据op
:string
. 用"c", "d", "u" 分别表达操作类型: 增、删、改ts_ms
:int64
. dtle 处理该行变更的时间.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "0.8.3.Final",
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"snapshot": true,
"thread": null,
"db": "inventory",
"table": "customers"
},
"op": "c",
"ts_ms": 1486500577691
}
}
DDL (SchemaChangeTopic)
dtle会将DDL写入SchemaChangeTopic。该topic值可配置.
Schema change消息中,key永远为null
, 仅 value部分有值:
{
"source" : {
"server" : "mysql2"
},
"position" : {
"ts_sec" : 1641807976,
"file" : "bin.000022",
"pos" : 439,
"gtids" : "acd7d195-06cd-11e9-928f-02000aba3e28:1-175",
"snapshot" : true
},
"databaseName" : "a",
"ddl" : "CREATE TABLE `a` (\n `id` int(11) NOT NULL AUTO_INCREMENT,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=latin1",
"tableChanges" : [ {
"type" : "CREATE",
"id" : "\"a\".\"a\"",
"table" : {
"defaultCharsetName" : "latin1",
"primaryKeyColumnNames" : [ "id" ],
"columns" : [ {
"name" : "id",
"jdbcType" : 4,
"typeName" : "INT",
"typeExpression" : "INT",
"charsetName" : null,
"length" : 11,
"position" : 1,
"optional" : false,
"autoIncremented" : true,
"generated" : true
} ]
}
} ]
}
其中:
position.snapshot==true
表明这是全量初始化时的表结构(通过show create table
等生成)。position.snapshot==false
则表明:这是增量过程中执行的DDL。
注:tableChanges
结构在dtle中尚未实现。