Kafka 消息格式

dtle Kafka 输出, 消息格式兼容 Debezium

其消息格式具体可参考 https://debezium.io/documentation/reference/1.8/tutorial.html

此处概要说明

  • 每行数据变更会有一个消息
  • 每个消息分为key和value
    • key是该次变更的主键
    • value是该次变更的整行数据
  • key和value各自又有schema和payload
    • payload是具体的数据
    • schema指明了数据的格式, 即payload的解读方式, 可以理解为“类定义”
      • 注意和SQL schema含义不同
      • 表结构会包含在 Kafka Connect schema 中

DML

Key

以下是一个消息的key. 只是简单的包含了主键.

{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

Value

以下是一个消息的value, 其类型为 topic.schema.table.Envelope, 拥有5个字段

  • before, 复杂类型 topic.schema.table.Value, 为该表的表结构.
  • after, 复杂类型, 同上
  • source, 复杂类型, 为该次变更的元数据
  • op: string. 用"c", "d", "u" 分别表达操作类型: 增、删、改
  • ts_ms: int64. dtle 处理该行变更的时间.
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "0.8.3.Final",
      "name": "dbserver1",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": "inventory",
      "table": "customers"
    },
    "op": "c",
    "ts_ms": 1486500577691
  }
}

DDL (SchemaChangeTopic)

dtle会将DDL写入SchemaChangeTopic。该topic值可配置.

Schema change消息中,key永远为null, 仅 value部分有值:

{
  "source" : {
    "server" : "mysql2"
  },
  "position" : {
    "ts_sec" : 1641807976,
    "file" : "bin.000022",
    "pos" : 439,
    "gtids" : "acd7d195-06cd-11e9-928f-02000aba3e28:1-175",
    "snapshot" : true
  },
  "databaseName" : "a",
  "ddl" : "CREATE TABLE `a` (\n  `id` int(11) NOT NULL AUTO_INCREMENT,\n  PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=latin1",
  "tableChanges" : [ {
    "type" : "CREATE",
    "id" : "\"a\".\"a\"",
    "table" : {
      "defaultCharsetName" : "latin1",
      "primaryKeyColumnNames" : [ "id" ],
      "columns" : [ {
        "name" : "id",
        "jdbcType" : 4,
        "typeName" : "INT",
        "typeExpression" : "INT",
        "charsetName" : null,
        "length" : 11,
        "position" : 1,
        "optional" : false,
        "autoIncremented" : true,
        "generated" : true
      } ]
    }
  } ]
}

其中:

  • position.snapshot==true表明这是全量初始化时的表结构(通过show create table等生成)。
  • position.snapshot==false则表明:这是增量过程中执行的DDL。

注:tableChanges结构在dtle中尚未实现。

MySQL数据类型到 “Kafka Connect schema types”的转换

https://debezium.io/docs/connectors/mysql/#data-types

results matching ""

    No results matching ""