作业(job)配置

作业配置一般采用 json (HTTP API 提交)或 hcl (nomad 命令行工具提交)文件。样例配置在 /usr/share/dtle/scripts/ 中。

nomad job 的完整配置参考 https://www.nomadproject.io/docs/job-specification/

nomad job 有group/task层级,一个group中的tasks会被放在同一个节点执行。dtle要求src和dest task分别放在src 和 dest group. task 中指定 driver = "dtle", 在config段落中填写dtle专有配置。

从3.22.10.0开始,dtle配置发生变化,所有常规配置填在源端任务(src task)。目标端固定填写一个配置项 DestType

  group "dest" {
    task "dest" {
      driver = "dtle"
      config {
        DestType = "mysql" # 或"kafka"
      }
    }
  }

dtle 源端任务有如下配置项:

参数名 必填? 类型 默认值 说明
Gtid String 默认为 全量+增量 任务 MySQL的GTID集合(区间), 可取值:
1. 默认为空, 则为 <全量+增量> 复制任务
2. 已复制的GTID集合(不是点位), 将从未复制的GTID开始增量复制
GtidStart String 增量复制开始的 GTID 点位. (将自动求差集获取上述 GTID 集合.) 需要保持 Gtid 为空
AutoGtid Bool false 设为 true 后自动从当前 GTID 开始增量任务. 需要保持 Gtid 和 GtidStart 为空.
BinlogRelay Bool false 是否使用Binlog Relay(中继)机制. 即先将源端mysql binlog读到本地, 避免源端清除binlog导致任务失败. 注意: 如果使用带有BinlogRelay的纯增量复制, 必须用Gtid指定复制起点,不能使用BinlogFile/Pos。
BinlogFile String 增量任务开始的Binlog文件(即源端mysql上 show master status 的结果).
BinlogPos Int 0 增量任务开始的Binlog位置, 和BinlogFile配套使用.
ReplicateDoDb Object数组 - 如为空[], 则复制整个数据库实例. 可填写多元素. 元素内容见下方说明
ReplicateIgnoreDb Object数组 - 指定要忽略的库表,优先级高于ReplicateDoDb。如为空[], 则完全执行ReplicateDoDb配置. 可填写多元素. 元素内容见下方说明
SrcConnectionConfig Object - MySQL源端信息, 见下方 ConnectionConfig 说明。和 OracleConfig 二选一填写。
DestConnectionConfig Object - MySQL目标端信息, 见下方 ConnectionConfig 说明。和 KafkaConfig 二选一填写。
SrcOracleConfig Object - Oracle源端信息, 见下方 OracleConfig 说明。和 SrcConnectionConfig 二选一填写。
KafkaConfig Object - Kafka目标端信息, 见下方 KafkaConfig 说明。和 DestConnectionConfig 二选一填写。
DropTableIfExists Bool false 全量复制时, 在目标端删除参与复制的表, 之后由dtle自动创建表结构 (相关参数: SkipCreateDbTable). 如果开启此选项, 目标端数据库用户需要有相应表的DROP权限.
SkipCreateDbTable Bool false 不为目标库创建复制库和复制表. 如果关闭此选项, 目标端数据库用户需要有相应表的CREATE权限.
ParallelWorkers Int 1 回放端的并发数. 当值大于1时, 目标端会进行并行回放
UseMySQLDependency Bool true 默认使用MySQL的并行回放事务依赖关系检测。如果不能开启源端MySQL的WRITESET追踪,可将此设为false,使用dtle的依赖检测。
DependencyHistorySize Int 2500 使用dtle并行复制计算事务依赖时,保存的行数。增大可以潜在地增加并行度,但会更消耗内存。
ForeignKeyChecks Bool true 3.21.10.0+. 默认开启目标端MySQL连接上的 @@foreign_key_checks
ReplChanBufferSize Int 32 复制任务缓存的大小, 单位为事务组数。事务组大小和GroupMaxSize/GroupTimeout有关。
ChunkSize Int 2000 全量复制时, 每次读取-传输-写入的行数
DumpEntryLimit Int 67108864 (64M) 复制时, 读取后分块发送的分块大小。空闲内存较小时需适当调小。适用于大全量/增量大事务
ExpandSyntaxSupport Bool false 支持复制 用户权限/存储过程DDL/函数DDL
GroupMaxSize Int 1 源端发送数据时, 等待数据包达到一定大小(GroupMaxSize字节)后发送该包. 单位为字节. 默认值1表示即刻发送数据
GroupTimeout Int 100 源端发送数据时, 等待数据包达到超时时间(GroupTimeout毫秒)发送该包. 单位为毫秒.
SqlFilter String数组 [] 是否跳过一些事件, 如 ["NoDMLDelete", "NoDDLDropSchema", "NoDDLDropTable", "NoDDLDropIndex", "NoDDLTruncate"]。详见下文。
SlaveNetWriteTimeout Int 28800 (8小时) 调整MySQL slave线程的超时时间。MySQL默认值为60,太短可能导致断连。太长则会导致异常连接回收不及时。
BulkInsert1 Int 4 批量插入第一级数量。见性能调优
BulkInsert2 Int 8 批量插入第二级数量。
BulkInsert3 Int 128 批量插入第三级数量。
SetGtidNext Bool false 目标端执行事务前执行set gtid_next = ..., 使源端目标端MySQL事务gtid相同。可用以避免循环复制。需要 REPLICATION_APPLIER (MySQL 8.0)或 SUPER 权限
TwoWaySync Bool false 开启双向任务。
TwoWaySyncGtid String "" 反向任务使用的Gtid。当值为"auto"时,从当前 GTID 开始增量。

ReplicateDoDb 每个元素有如下字段:

参数名 必填? 类型 默认值 说明
TableSchema String - 数据库名
TableSchemaRegex String - 数据库映射正则表达式,可用于多个数据库重命名
TableSchemaRename String - 重命名后的数据库名称,当进行多数据库重命名时,支持正则表达式,使用见demo
Tables Object数组 - 可配置多张表, 类型为Table. 若不配置, 则复制指定数据库中的所有表
Table.TableName String - 表名
Table.Where String - 只复制满足该条件的数据行. 语法为SQL表达式, 返回值应为布尔值. 可以引用表中的列名.
Table.TableRegex String - 表名映射匹配正则表达式,用于多个表同时重命名.
Table.TableRename String - 重命名后的表名,当进行多表重命名时,支持支持正则表达,见demo
Table.ColumnMapFrom String数组 - 列映射(暂不支持正则表达式)。见demo
Table.ColumnMapTo String数组 - 列映射(暂不支持正则表达式)。见demo

注:hcl格式中${SOME_TEXT}会被认为是变量引用。正则替换中输入此类文字时,则需使用双$符号:$${SOME_TEXT}

ReplicateIgnoreDb 每个元素有如下字段:

参数名 必填? 类型 默认值 说明
TableSchema String - 数据库名
Tables Object数组 - 可配置多张表, 类型为Table. 若不配置, 则忽略指定数据库中的所有表
Table.TableName String - 表名

ConnectionConfig 有如下字段:

参数名 必填? 类型 默认值 说明
Host String - 数据源地址
Port String - 数据源端口
User String - 数据源用户名
Password String - 数据源密码
Charset String utf8mb4 数据源的字符集

KafkaConfig 有如下字段:

参数名 必填? 类型 默认值 说明
Topic String - Kafka Topic
SchemaChangeTopic String "schema-changes.Topic" Schema change (DDL) 消息使用的topic
TopicWithSchemaTable Bool true 默认最终topic为 指定的Topic.库名.表名, 如果不需要追加库表名,请设为false
Brokers String数组 - Kafka Brokers, 如 ["127.0.0.1:9192", "..."]
Converter String json Kafka Converter。目前仅支持json
MessageGroupMaxSize int 1 目标端向kafka发送消息时, 等待MySQL事务数据包达到一定大小(MessageGroupMaxSize字节)后将该包序列化并发送. 单位为字节. 默认值1表示即刻发送数据
MessageGroupTimeout int 100 目标端向kafka发送消息时, 等待数据包达到超时时间(MessageGroupTimeout毫秒)发送该包. 单位为毫秒.
User String - Kafka SASL.User
Password String - Kafka SASL.Password

OracleConfig 有如下字段:

参数名 必填? 类型 默认值 说明
Host String - 数据源地址
Port String - 数据源端口
User String - 数据源用户名
Password String - 数据源密码
ServiceName String XE 数据源服务名
Scn int 0 同步起点

SqlFilter注意事项

全部的filter:

  • NoDML
  • NoDMLInsert, NoDMLDelete, NoDMLUpdate
  • NoDDL
  • NoDDLCreateSchema, NoDDLCreateTable
  • NoDDLDropSchema, NoDDLDropTable, NoDDLDropIndex, NoDDLTruncate
  • NoDDLAlterTable
  • NoDDLAlterTableAddColumn, NoDDLAlterTableDropColumn
  • NoDDLAlterTableModifyColumn, NoDDLAlterTableChangeColumn, NoDDLAlterTableAlterColumn

SqlFilter只能简单过滤相关语句。不会自动转换后续语句。例如

-- SqlFilter = ["NoDDLDropTable"]

/** 源端 **/
-- 已有 table a.a (id int primary key)
drop table a.a;
create table a.a (id int primary key, val int);
insert into a.a values (1, 11);

/** 目标端 **/
-- 已有 table a.a (id int primary key)
-- drop table 语句被过滤
create table a.a (id int primary key, val int);
-- 执行错误,目标表已存在
insert into a.a values (1, 11);
-- 执行错误,列数目不对

用户需自行确保在发生过滤的情况下,后续DML/DDL能正确执行。

nomad job 常用通用配置

constraint

job、group 或 task 级配置。配置后该job/group/task会绑定在指定的节点上执行

constraint {
  attribute = "${node.unique.name}"
  value = "nomad3"
}

完整参考

resources

task级配置,src/dest task需各自重复。默认值为 cpu=100memory=300。 以默认值建立大量轻量级任务,会导致资源不够而pending,可适当调小。

任务的内存消耗和每行大小、事物大小、队列长度有关。注意真实资源消耗,避免OOM。

task "src" {
  resources {
    cpu    = 100 # MHz
    memory = 300 # MB
  }
}

restart & reschedule

nomad job 默认有如下 restartreschedule 配置

restart { # group or task level
  interval = "30m"
  attempts = 2
  delay    = "15s"
  mode     = "fail" # "fail" or "delay"
                    # "delay" 意味着interval过后继续尝试
                    # "fail" 则不再尝试
}
reschedule { # job or group level
 delay          = "30s"
 delay_function = "exponential"
 max_delay      = "1h"
 unlimited      = true
}
  • 当task报错时,会根据restart配置,30分钟内在同一节点上重启最多两次
    • 即使失败的job被stop -purge再重新添加,也需要根据restart参数重启
  • 2次重启均失败后,会根据reschedule配置,在其他节点上执行

为了避免无限reschedule带来的问题,dtle安装包提供的样例job配置中(<prefix>/usr/share/dtle/scripts/example.job.*),限制reschedule为每半小时1次:

reschedule {
  attempts = 1
  interval = "30m"
  unlimited = false
}
# 或json格式
"Reschedule": {
  "Attempts": 1,
  "Interval": 1800000000000,
  "Unlimited": false
}

results matching ""

    No results matching ""