作业(job)配置
作业配置一般采用 json (HTTP API 提交)或 hcl (nomad 命令行工具提交)文件。样例配置在 /usr/share/dtle/scripts/
中。
nomad job 的完整配置参考 https://www.nomadproject.io/docs/job-specification/
nomad job 有group/task层级,一个group中的tasks会被放在同一个节点执行。dtle要求src和dest task分别放在src 和 dest group. task 中指定 driver = "dtle", 在config段落中填写dtle专有配置。
从4.22.11.0开始,dtle配置发生变化
- 所有常规配置填在源端任务(src task)
- 原两端的
ConnectionConfig
的分别重命名为SrcConnectionConfig
和DestConnectionConfig
- 原两端的
- 目标端固定填写一个配置项
DestType
group "dest" {
task "dest" {
driver = "dtle"
config {
DestType = "mysql" # 或"kafka"
}
}
}
从3.x ~ 4.22.07.x升级到4.22.11后, 可使用 /usr/share/dtle/scripts/dtle-7to11.py
更新现有job配置格式.
./dtle-7to11.py 'http://127.0.0.1:4646'
dtle 源端任务有如下配置项:
参数名 | 必填? | 类型 | 默认值 | 说明 |
---|---|---|---|---|
Gtid | 否 | String | 默认为 全量+增量 任务 | MySQL的GTID集合(区间), 可取值: 1. 默认为空, 则为 <全量+增量> 复制任务 2. 已复制的GTID集合(不是点位), 将从未复制的GTID开始增量复制 |
GtidStart | 否 | String | 增量复制开始的 GTID 点位. (将自动求差集获取上述 GTID 集合.) 需要保持 Gtid 为空 | |
AutoGtid | 否 | Bool | false | 设为 true 后自动从当前 GTID 开始增量任务. 需要保持 Gtid 和 GtidStart 为空. |
BinlogRelay | 否 | Bool | false | 是否使用Binlog Relay(中继)机制. 即先将源端mysql binlog读到本地, 避免源端清除binlog导致任务失败. 注意: 如果使用带有BinlogRelay的纯增量复制, 必须用Gtid指定复制起点,不能使用BinlogFile/Pos。 |
BinlogFile | 否 | String | 增量任务开始的Binlog文件(即源端mysql上 show master status 的结果). |
|
BinlogPos | 否 | Int | 0 | 增量任务开始的Binlog位置, 和BinlogFile配套使用. |
ReplicateDoDb | 否 | Object数组 | - | 如为空[] , 则复制整个数据库实例. 可填写多元素. 元素内容见下方说明 |
ReplicateIgnoreDb | 否 | Object数组 | - | 指定要忽略的库表,优先级高于ReplicateDoDb。如为空[] , 则完全执行ReplicateDoDb配置. 可填写多元素. 元素内容见下方说明 |
SrcConnectionConfig | 否 | Object | - | MySQL源端信息, 见下方 ConnectionConfig 说明。和 OracleConfig 二选一填写。 |
DestConnectionConfig | 否 | Object | - | MySQL目标端信息, 见下方 ConnectionConfig 说明。和 KafkaConfig 二选一填写。 |
SrcOracleConfig | 否 | Object | - | Oracle源端信息, 见下方 OracleConfig 说明。和 SrcConnectionConfig 二选一填写。 |
KafkaConfig | 否 | Object | - | Kafka目标端信息, 见下方 KafkaConfig 说明。和 DestConnectionConfig 二选一填写。 |
DropTableIfExists | 否 | Bool | false | 全量复制时, 在目标端删除参与复制的表, 之后由dtle自动创建表结构 (相关参数: SkipCreateDbTable ). 如果开启此选项, 目标端数据库用户需要有相应表的DROP 权限. |
SkipCreateDbTable | 否 | Bool | false | 不为目标库创建复制库和复制表. 如果关闭此选项, 目标端数据库用户需要有相应表的CREATE 权限. |
ParallelWorkers | 否 | Int | 1 | 回放端的并发数. 当值大于1时, 目标端会进行并行回放 |
UseMySQLDependency | 否 | Bool | true | 默认使用MySQL的并行回放事务依赖关系检测。如果不能开启源端MySQL的WRITESET追踪,可将此设为false,使用dtle的依赖检测。 |
DependencyHistorySize | 否 | Int | 2500 | 使用dtle并行复制计算事务依赖时,保存的行数。增大可以潜在地增加并行度,但会更消耗内存。 |
ForeignKeyChecks | 否 | Bool | true | 3.21.10.0+. 默认开启目标端MySQL连接上的 @@foreign_key_checks |
ReplChanBufferSize | 否 | Int | 32 | 复制任务缓存的大小, 单位为事务组数。事务组大小和GroupMaxSize/GroupTimeout有关。 |
ChunkSize | 否 | Int | 2000 | 全量复制时, 每次读取-传输-写入的行数 |
DumpEntryLimit | 否 | Int | 67108864 (64M) | 复制时, 读取后分块发送的分块大小。空闲内存较小时需适当调小。适用于大全量/增量大事务 |
ExpandSyntaxSupport | 否 | Bool | false | 支持复制 用户权限/存储过程DDL/函数DDL |
GroupMaxSize | 否 | Int | 1 | 源端发送数据时, 等待数据包达到一定大小(GroupMaxSize 字节)后发送该包. 单位为字节. 默认值1表示即刻发送数据 |
GroupTimeout | 否 | Int | 100 | 源端发送数据时, 等待数据包达到超时时间(GroupTimeout 毫秒)发送该包. 单位为毫秒. |
SqlFilter | 否 | String数组 | [] | 是否跳过一些事件, 如 ["NoDMLDelete", "NoDDLDropSchema", "NoDDLDropTable", "NoDDLDropIndex", "NoDDLTruncate"] 。详见下文。 |
SlaveNetWriteTimeout | 否 | Int | 28800 (8小时) | 调整MySQL slave线程的超时时间。MySQL默认值为60,太短可能导致断连。太长则会导致异常连接回收不及时。 |
BulkInsert1 | 否 | Int | 4 | 批量插入第一级数量。见性能调优 |
BulkInsert2 | 否 | Int | 8 | 批量插入第二级数量。 |
BulkInsert3 | 否 | Int | 128 | 批量插入第三级数量。 |
SetGtidNext | 否 | Bool | false | 目标端执行事务前执行set gtid_next = ... , 使源端目标端MySQL事务gtid相同。可用以避免循环复制。需要 REPLICATION_APPLIER (MySQL 8.0)或 SUPER 权限 |
TwoWaySync | 否 | Bool | false | 开启双向任务。 |
TwoWaySyncGtid | 否 | String | "" | 反向任务使用的Gtid。当值为"auto"时,从当前 GTID 开始增量。 |
RetryTxLimit | 否 | Int | 3 | 当执行发生某些错误时(如:deadlock),重试事务的次数 |
ReplicateDoDb 每个元素有如下字段:
参数名 | 必填? | 类型 | 默认值 | 说明 |
---|---|---|---|---|
TableSchema | 否 | String | - | 数据库名 |
TableSchemaRegex | 否 | String | - | 数据库映射正则表达式,可用于多个数据库重命名 |
TableSchemaRename | 否 | String | - | 重命名后的数据库名称,当进行多数据库重命名时,支持正则表达式,使用见demo |
Tables | 否 | Object数组 | - | 可配置多张表, 类型为Table. 若不配置, 则复制指定数据库中的所有表 |
Table.TableName | 否 | String | - | 表名 |
Table.Where | 否 | String | - | 只复制满足该条件的数据行. 语法为SQL表达式, 返回值应为布尔值. 可以引用表中的列名. |
Table.TableRegex | 否 | String | - | 表名映射匹配正则表达式,用于多个表同时重命名. |
Table.TableRename | 否 | String | - | 重命名后的表名,当进行多表重命名时,支持支持正则表达,见demo |
Table.ColumnMapFrom | 否 | String数组 | - | 列映射(暂不支持正则表达式)。见demo |
Table.ColumnMapTo | 否 | String数组 | - | 列映射(暂不支持正则表达式)。见demo |
注:hcl格式中${SOME_TEXT}
会被认为是变量引用。正则替换中输入此类文字时,则需使用双$符号:$${SOME_TEXT}
。
ReplicateIgnoreDb 每个元素有如下字段:
参数名 | 必填? | 类型 | 默认值 | 说明 |
---|---|---|---|---|
TableSchema | 是 | String | - | 数据库名 |
Tables | 否 | Object数组 | - | 可配置多张表, 类型为Table. 若不配置, 则忽略指定数据库中的所有表 |
Table.TableName | 否 | String | - | 表名 |
ConnectionConfig 有如下字段:
参数名 | 必填? | 类型 | 默认值 | 说明 |
---|---|---|---|---|
Host | 是 | String | - | 数据源地址 |
Port | 是 | String | - | 数据源端口 |
User | 是 | String | - | 数据源用户名 |
Password | 是 | String | - | 数据源密码 |
Charset | 否 | String | utf8mb4 | 数据源的字符集 |
KafkaConfig 有如下字段:
参数名 | 必填? | 类型 | 默认值 | 说明 |
---|---|---|---|---|
Topic | 是 | String | - | Kafka Topic |
SchemaChangeTopic | 否 | String | "schema-changes.Topic" | Schema change (DDL) 消息使用的topic |
TopicWithSchemaTable | 否 | Bool | true | 默认最终topic为 指定的Topic.库名.表名 , 如果不需要追加库表名,请设为false |
Brokers | 是 | String数组 | - | Kafka Brokers, 如 ["127.0.0.1:9192", "..."] |
Converter | 否 | String | json | Kafka Converter。目前仅支持json |
MessageGroupMaxSize | 否 | int | 1 | 目标端向kafka发送消息时, 等待MySQL事务数据包达到一定大小(MessageGroupMaxSize字节)后将该包序列化并发送. 单位为字节. 默认值1表示即刻发送数据 |
MessageGroupTimeout | 否 | int | 100 | 目标端向kafka发送消息时, 等待数据包达到超时时间(MessageGroupTimeout毫秒)发送该包. 单位为毫秒. |
User | 否 | String | - | Kafka SASL.User |
Password | 否 | String | - | Kafka SASL.Password |
OracleConfig 有如下字段:
参数名 | 必填? | 类型 | 默认值 | 说明 |
---|---|---|---|---|
Host | 是 | String | - | 数据源地址 |
Port | 是 | String | - | 数据源端口 |
User | 是 | String | - | 数据源用户名 |
Password | 是 | String | - | 数据源密码 |
ServiceName | 否 | String | XE | 数据源服务名 |
Scn | 否 | int | 0 | 同步起点 |
SqlFilter注意事项
全部的filter:
- NoDML
- NoDMLInsert, NoDMLDelete, NoDMLUpdate
- NoDDL
- NoDDLCreateSchema, NoDDLCreateTable
- NoDDLDropSchema, NoDDLDropTable, NoDDLDropIndex, NoDDLTruncate
- NoDDLAlterTable
- NoDDLAlterTableAddColumn, NoDDLAlterTableDropColumn
- NoDDLAlterTableModifyColumn, NoDDLAlterTableChangeColumn, NoDDLAlterTableAlterColumn
SqlFilter只能简单过滤相关语句。不会自动转换后续语句。例如
-- SqlFilter = ["NoDDLDropTable"]
/** 源端 **/
-- 已有 table a.a (id int primary key)
drop table a.a;
create table a.a (id int primary key, val int);
insert into a.a values (1, 11);
/** 目标端 **/
-- 已有 table a.a (id int primary key)
-- drop table 语句被过滤
create table a.a (id int primary key, val int);
-- 执行错误,目标表已存在
insert into a.a values (1, 11);
-- 执行错误,列数目不对
用户需自行确保在发生过滤的情况下,后续DML/DDL能正确执行。
nomad job 常用通用配置
constraint
job、group 或 task 级配置。配置后该job/group/task会绑定在指定的节点上执行
constraint {
attribute = "${node.unique.name}"
value = "nomad3"
}
完整参考
- https://www.nomadproject.io/docs/job-specification/constraint
- https://www.nomadproject.io/docs/runtime/interpolation#interpreted_node_vars
resources
task级配置,src/dest task需各自重复。默认值为 cpu=100
,memory=300
。
以默认值建立大量轻量级任务,会导致资源不够而pending,可适当调小。
任务的内存消耗和每行大小、事物大小、队列长度有关。注意真实资源消耗,避免OOM。
task "src" {
resources {
cpu = 100 # MHz
memory = 300 # MB
}
}
restart & reschedule
nomad job 默认有如下 restart 和 reschedule 配置
restart { # group or task level
interval = "30m"
attempts = 2
delay = "15s"
mode = "fail" # "fail" or "delay"
# "delay" 意味着interval过后继续尝试
# "fail" 则不再尝试
}
reschedule { # job or group level
delay = "30s"
delay_function = "exponential"
max_delay = "1h"
unlimited = true
}
- 当task报错时,会根据restart配置,30分钟内在同一节点上重启最多两次
- 即使失败的job被
stop -purge
再重新添加,也需要根据restart参数重启
- 即使失败的job被
- 2次重启均失败后,会根据reschedule配置,在其他节点上执行
为了避免无限reschedule带来的问题,dtle安装包提供的样例job配置中(<prefix>/usr/share/dtle/scripts/example.job.*
),限制reschedule为每半小时1次:
reschedule {
attempts = 1
interval = "30m"
unlimited = false
}
# 或json格式
"Reschedule": {
"Attempts": 1,
"Interval": 1800000000000,
"Unlimited": false
}