k8s部署canal 1.1.6集群
# 一、概述
canal是阿里开源的用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后可以利用canal client 用来处理消费获得的相关数据,支持很多种客户端,具体参考Canal官方文档 (opens new window)
# 二、集群背景
本文没有去探究各种客户端消费的场景,只是为了使用canal进行测试环境数据同步,生产环境为了数据同步的稳定性,建议直接使用阿里比较成熟的产品DTS (opens new window)
# 2.1 涉及到的组件
- canal-admin: 为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
- canal-server: 也就是
canal-deployer
,伪装自己为数据库master的slave, 用于接收binlog日志以及解析binlog。 - canal-adapter: 适配器,对接上游canal-server,实现Mysql的全量、增量同步,然后写入至下游中间件。
# 2.2 架构
# 2.3 涉及密码说明
这里需要对部署时涉及到的密码简单介绍下,因为不同的组件对于密码字段是有要求的,有的需要明文,有的则需要密文。
关于MySQL生成密码的方法,可以参考如下方式:
- MySQL5.7.5及以下
-- xxx 密码明文 会输出密文
select password('xxx');
2
- MySQL5.7.5以上,需要去掉结果中的*
SELECT CONCAT('*', UPPER(SHA1(UNHEX(SHA1('xxx')))));
# 三、组件部署
# 3.1 Canal Admin
Canal Admin 是1.1.4版本才推出的一个实例配置UI管理页面,需要独立部署。
# 3.1.1 资源清单准备
- 资源清单ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-admin
namespace: default
data:
admin_user: "admin_test"
admin_password: "123456"
datasource_address: "172.41.10.10:3306"
datasource_database: "canal_manager"
datasource_username: "canal_manager_user"
datasource_password: "123456"
2
3
4
5
6
7
8
9
10
11
12
注意
admin_user/admin_password:
Canal Server
连接Canal admin
时的账号密码,这里admin_password字段配置的是明文,在Canal Server
端需要配置密文。datasource相关配置属于
canal-admin
系统本身启动运行所需要的数据库连接信息,与同步数据库没有关系。
- 资源清单Deployment
kind: Deployment
apiVersion: apps/v1
metadata:
name: canal-admin
labels:
app.kubernetes.io/name: canal-admin
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: canal-admin
template:
metadata:
name: canal-admin
labels:
app.kubernetes.io/name: canal-admin
spec:
containers:
- name: canal-admin
image: canal/canal-admin:v1.1.6
imagePullPolicy: IfNotPresent
ports:
- name: web
containerPort: 8089
protocol: TCP
env:
- name: server.port
value: '8089'
- name: canal.adminUser
valueFrom:
configMapKeyRef:
name: canal-admin
key: admin_user
- name: canal.adminPasswd
valueFrom:
configMapKeyRef:
name: canal-admin
key: admin_password
- name: spring.datasource.address
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_address
- name: spring.datasource.database
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_database
- name: spring.datasource.username
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_username
- name: spring.datasource.password
valueFrom:
configMapKeyRef:
name: canal-admin
key: datasource_password
resources:
requests:
cpu: 250m
memory: 256Mi
livenessProbe:
httpGet:
path: /
port: 8089
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
readinessProbe:
httpGet:
path: /
port: 8089
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
restartPolicy: Always
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: canal-admin
topologyKey: kubernetes.io/hostname
revisionHistoryLimit: 10
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
- 资源清单Service
kind: Service
apiVersion: v1
metadata:
name: canal-admin
spec:
ports:
- protocol: TCP
port: 8089
targetPort: 8089
selector:
app.kubernetes.io/name: canal-admin
2
3
4
5
6
7
8
9
10
11
- 资源清单ingress
因为是web页面所以需要对外暴露登录使用
kind: Ingress
metadata:
name: canal-admin
annotations:
kubernetes.io/ingress.class: "nginx"
prometheus.io/http_probe: "true"
spec:
rules:
- host: canal-admin.tchua.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: canal-admin
port:
number: 8089
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 3.1.2 数据库准备
canal-admin运行依赖数据库,这里我直接使用已有数据库,注意ConfigMap里面配置的datasource_username需要对该数据库有读写权限
关于canal-admin用到的sql文件,直接从官方获取即可数据库文件 (opens new window),但是也要注意版本问题,不过一般情况下不会有问题sql中会插入一条用户名密码,用于登录canal-admin系统,默认用户名密码为admin/123456,可以登录系统后右上角账号管理处重置密码。
-- 密文 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9,明文为123456
INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
2
# 3.1.3 部署
[root@k8s-master01 canal]# kubectl apply -f canal-admin-configmp.yaml
[root@k8s-master01 canal]# kubectl apply -f canal-admin-deployment.yaml
[root@k8s-master01 canal]# kubectl get pods |grep canal-admin
canal-admin-65f69ff697-gsb4n 1/1 Running 0 24h
[root@k8s-master01 canal]# kubectl apply -f canal-admin-svc.yaml
[root@k8s-master01 canal]# kubectl get svc |grep canal-admin
canal-admin ClusterIP 172.15.157.183 <none> 8089/TCP 24h
[root@k8s-master01 canal]# kubectl apply -f canal-admin-ingress.yaml
[root@k8s-master01 canal]# kubectl get ingress |grep canal-admin
canal-admin <none> canal-admin.tchua.com 80 24h
2
3
4
5
6
7
8
9
10
11
12
13
# 3.1.4 登录
上面各种资源应用之后,没有问题的话,就可以使用ingress映射的域名进行访问
# 3.2 Canal Server
# 3.2.1 配置准备
- 集群配置
上面启动登录canal-admin系统后,就可以新建集群,然后配置server相关配置
新建local集群,输入zk集群地址,关于zk部署,可参考下文
- 配置server配置文件
如果使用的单机部署,那么该配置文件是canal-server中维护,启动的时候需要指定,这里直接在admin维护,server进行拉取即可
注意
上面点击载入模版之后,会自动加载默认配置,修改相关参数后点击,这里对几个关键的配置简单做下说明:
- canal.user 和 cancal.passwd: 客户端
canal-adapter
访问的账号密码,密码需要密文。 - canal.admin.user和canal.admin.passwd: 上面部署
canal-admin
时在ConfigMap
中定义的用户名密码,不过这里需要注意的是passwd
跟上面不同,需要使用密文。 - canal.zkServers: 直接配置zk集群的地址即可。
- canal.destinations: 对应的instance名称,这里配置 example。
完整配置
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = dts_test
# canal.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin_test
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers = zookeeper.default.svc.cluster.local:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:6667
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
- 资源清单ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-server
data:
admin_manager: "canal-admin.default.svc.cluster.local:8089"
admin_port: "11110"
admin_user: "admin_test"
admin_password: "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9"
admin_register_cluster: "local"
admin_register_auto: "true"
2
3
4
5
6
7
8
9
10
11
注意
- admin_user/admin_password: 连接admin的用户名密码,需要与上面在主机配置的的参数
canal.admin.user
和canal.admin.passwd
一致。 - admin_register_cluster: 集群名称,也就是上面在admin页面创建的
local
。 - admin_register_auto: 是否自动注册集群中。
- 资源清单StatefulSet
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: canal-server
labels:
app.kubernetes.io/name: canal-server
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: canal-server
template:
metadata:
name: canal-server
labels:
app.kubernetes.io/name: canal-server
spec:
containers:
- name: canal-server
image: canal/canal-server:latest
imagePullPolicy: IfNotPresent
ports:
- name: tcp
containerPort: 11111
protocol: TCP
env:
- name: canal.admin.manager
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_manager
- name: canal.admin.port
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_port
- name: canal.admin.user
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_user
- name: canal.admin.passwd
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_password
- name: canal.admin.register.cluster
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_register_cluster
- name: canal.admin.register.auto
valueFrom:
configMapKeyRef:
name: canal-server
key: admin_register_auto
resources:
requests:
cpu: 250m
memory: 256Mi
livenessProbe:
tcpSocket:
port: 11112
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
readinessProbe:
tcpSocket:
port: 11112
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 30
restartPolicy: Always
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: canal-server
topologyKey: kubernetes.io/hostname
revisionHistoryLimit: 10
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
特别注意
canal/canal-server:latest
我这里使用的镜像为latest版本,如果使用1.1.6镜像版本,则可能会出现Table 'XXX.base table' doesn't exist
,具体可参考:https://github.com/alibaba/canal/issues/4304
- 资源清单service
kind: Service
apiVersion: v1
metadata:
name: canal-service
labels:
app: canal-service
spec:
ports:
- port: 11111
name: server
- port: 11112
name: metrics
type: clusterIP
selector:
app.kubernetes.io/name: canal-server
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 3.2.2 部署
[root@k8s-master01 canal]# kubectl apply -f canal-server.yaml
[root@k8s-master01 canal]# kubectl get pods |grep canal-server
canal-server-0 1/1 Running 0 6h51m
[root@k8s-master01 canal]# kubectl get svc|grep canal-service
canal-service ClusterIP 172.15.164.250 <none> 11111/TCP,11112/TCP 9s
2
3
4
5
# 3.2.3 查看集群状态
由于admin_register_auto配置的为true,所以这里启动后,在amdin后台,可以看到server自动就注册上来了
# 3.2.4 instance创建
进入canal-amdin管理后台,左侧菜单点击 Instance 管理,点击新建instance,在表单页面中输入名称,选择集群/主机,点击载入模板后修改配置信息,配置中修改如下内容后,点击保存
重要参数说明
上面截图创建一个名称为example
的Instance,关联集群为local,Instance名称在下面启动适配器时会需要使用。
canal.instance.master.address
:源库地址。
canal.instance.dbUsername
和 canal.instance.dbPassword
: 访问源库数据库用户名密码,注意这里密码字段为明文。
canal.instance.filter.regex
: 监听的数据库及表的正则信息。
完整配置
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=172.41.10.10:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=dts_test
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=chan_db\\..*
# table black regex
canal.instance.filter.black.regex=.\.BASE.
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 3.3 Canal-adapter
adapter属于一个客户端,里面内置多种适配器对接下游数据库,由于官方并未提供canal-adapter镜像,这里使用其他博主制作的镜像: https://github.com/funnyzak/canal-docker
# 3.3.1 资源配置准备
- 资源清单-ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: canal-adapter-configmap
data:
application.yml: |-
server:
port: 8081
logging:
level:
org.springframework: WARN
com.alibaba.otter.canal.client.adapter.rdb: WARN
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp
flatMessage: true
# 修改点:zookeeper服务对应的地址
# ## 这里由于是使用的集群模式,如果仅仅是单机模式,则直接使用canalServerHost字段指定canal-server即可
zookeeperHosts: zookeeper.default.svc.cluster.local:2181
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# 修改点:由于采用tcp方式,所以这里配置就是tcp方式下canal server对应的地址
# ## 同样支持kafka rocketMQ 模式
canal.tcp.server.host: canal-service:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# 修改点:源库数据源配置信息
srcDataSources:
monitorAlterDS:
url: jdbc:mysql://172.41.10.10:3306/chan_db?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
username: dts_test
password: 123456
# 修改点:目标库数据源配置信息
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://172.41.11.10:3306/chan_db?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
jdbc.username: dts_test
jdbc.password: 123456
bootstrap.yml: ""
# 修改点:表方式同步配置文件
monitoralter.yml: |-
dataSourceKey: monitorAlterDS
destination: example
groupId: g1
outerAdapterKey: mysql
concurrent: true
dbMapping:
mirrorDb: true
database: chan_db
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
配置解读
canalAdapters
instance: example
: example需要与上面在canal-admin后台创建instance时的名称一致。outAdapter
- name:rdb,是一个类型的统称。
- key: mysql,指定adapter的唯一key, 与表映射配置中outerAdapterKey对应。
- properties: 定义下游数据库相关配置信息。
monitoralter.yml
- dataSourceKey: 源数据源的key, 对应上面配置的srcDataSources中的值。
- destination: instance名称,与上面canalAdapters配置需要一致。
- outerAdapterKey:adapter key, 对应上面配置outAdapters中的key。
- dbMapping: 源数据库database/shcema信息
- database: 指定同步的数据库
- 更多配置移步官方 (opens new window)
bootstrap.yml
- 这里需要配置为空,具体原因参考issue (opens new window)
- 资源清单-Deployment
kind: Deployment
apiVersion: apps/v1
metadata:
labels:
app: canal-adapter
name: canal-adapter
spec:
replicas: 1
minReadySeconds: 10
selector:
matchLabels:
app: canal-adapter
template:
metadata:
name: canal-adapter
labels:
app: canal-adapter
spec:
volumes:
- name: conf
configMap:
name: canal-adapter-configmap
containers:
- name: canal-adapter
image: funnyzak/canal-adapter:1.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- mountPath: /opt/canal/canal-adapter/conf/application.yml
name: conf
subPath: application.yml
- mountPath: /opt/canal/canal-adapter/conf/bootstrap.yml
name: conf
subPath: bootstrap.yml
- mountPath: /opt/canal/canal-adapter/conf/rdb/monitoralter.yml
name: conf
subPath: monitoralter.yml
ports:
- containerPort: 8081
protocol: TCP
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 3.3.2 部署
# 这里是直接把adapter ConfigMap与Deployment 资源清单合并为一个文件
[root@k8s-master01 canal]# kubectl apply -f canal-adapter.yaml
[root@k8s-master01 canal]# kubectl get pods|grep adap
canal-adapter-784c7d4f97-g4rrj 1/1 Running 0 22h
2
3
4
# 四、总结
上面部署完成后,数据就可以实时同步了,由于我这边直接基于生产环境,所以这里没有演示数据的同步情况,不过只有上面这些组件部署的时候,都正常启动,数据源信息配置正常,则数据同步,就没有什么问题,上面集群其实会依赖zk集群,我上面并没有对zk集群部署的说明,因为我这边只是简单部署了一个单节点的zk使用,对于生产环境,建议大家部署多节点zk集群,保证集群稳健性。
- zk集群清单文件
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper
spec:
selector:
matchLabels:
app: zk
template:
metadata:
labels:
app: zk
spec:
containers:
- name: zk
image: zookeeper:latest
ports:
- containerPort: 2181
name: zkclient
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper
spec:
type: ClusterIP
ports:
- port: 2181
selector:
app: zk
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31