使用docker部署mysql,并开启binlog
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
在验证flink-cdc-mysql时需要部署一个开启了binlog的mysql服务。cdc文档中有一个demo可以尝试部署但是我在验证的时候发现可能存在一些问题所以就尝试自己部署一个mysql服务。与cdc demo中类似使用docker部署是最快的方案。
网上整理了一些资料发现思路大概是
- 部署mysql docker服务
- 编辑mysql 配置文件
- 启动mysql 服务。
首先根据需要编写docker-compose文件。
创建一个mysql目录然后执行 vim docker-compose.yml
命令。
将下面内容粘贴进去适当进行修改。
version: "3.7"
services:
mysql:
image: mysql:5.7.28
container_name: mysql-binlog2
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
# root用户密码
MYSQL_ROOT_PASSWORD: 123456
TZ: Asia/Shanghai
ports:
- 3306:3306
volumes:
- /home/leiline/cdc/data/mysql/master/data:/var/lib/mysql
- /home/leiline/cdc/data/mysql/master/log:/var/log/mysql
- /home/leiline/cdc/data/mysql/master/conf:/etc/mysql
保存文件后退出。
这里需要在服务器中创建目录分别用来保存mysql的数据日志和配置信息。
mkdir -p /home/leiline/cdc/data/mysql/master/data
mkdir -p /home/leiline/cdc/data/mysql/master/log
mkdir -p /home/leiline/cdc/data/mysql/master/conf
其中我们要把配置信息放在conf目录下。mysql的配置信息保存在一个名称为my.cnf的文件中将下列信息粘贴到my.cnf文件中。
## 局域网唯一
server_id=1
## 指定不需要同步的数据库名称
binlog-ignore-db=master
## 开启二进制日志功能
log-bin=/var/lib/mysql/mysql-bin
## 设置二进制日志使用内存大小事务
binlog_cache_size=1M
## 设置使用的二进制日志格式mixed,statement,row
binlog_format=ROW
## 二进制日志过期清理时间。默认值为0表示不自动清理。
expire_logs_days=7
我们在这里设置的Binlog二进制格式为ROW。
最后启动docker-compose。执行命令 sudo docker-compose up -d
docker-compose 会自动加载docker-compose.yml 文件中的内容拉取 mysql:5.7.28 镜像最后将创建mysql-binlog2 容器。
执行 docker ps 可以查看当前运行中的容器有哪些以及一些概要信息。
sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
c6e03b8a0656 mysql:5.7.28 "docker-entrypoint.s…" 2 hours ago Up About an hour 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp mysql-binlog2
进入mysql中进行操作
到目前为止mysql服务已经启动我们可以进入mysql中查看相关配置信息并进行数据操作。执行命令 sudo docker-compose exec mysql mysql -uroot -p123456
就可以进入到mysql服务中。
执行命令 show global variables like "%binlog%";
和 show global variables like "%log_bin%";
可以查询相关配置信息。
由截图可以看到binlog_format的值为ROWlog_bin的值为ON。这样表示binlog已经开启且二进制日志类型为ROW。
接下来我们就可以进行数据操作了。
插入mysql数据
我们还是借助flink cdc demo中的例子将建表语句和insert语句在mysql中执行。
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
flink mysql cdc 抽数
最后我们要借助flink mysql cdc 进行binlog数据的抽取。
首先准备好sql语句
SET execution.checkpointing.interval = 3s;
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'database-name' = 'mydb',
'table-name' = 'products'
);
CREATE TABLE printer (
id INT,
name STRING,
description STRING
) WITH (
'connector' = 'print'
);
INSERT INTO printer SELECT * FROM products;
sql语句的内容很简单就是将products表的日志数据抽取然后打印出来。
因为我们只是一个demo测试所以我们使用flink local模式执行SQL。与flink cdc demo一样首先执行./bin/start-cluster.sh
然后进入到 sql-client中./sql-client.sh embedded
这个时候我们进入到了flink sql客户端中将上述SQL复制进sql-client中sql-client会自动将SQL进行解析执行提交到flink集群中运行。
如果一切正常的话我们可以打开地址 http://localhost:8081/ 可以看到flink dashboard。在taskmanager 的out中可以看到输出内容。
通过对Mysql数据的修改我们测试了增删改的操作CDC都可以正常的进行数据采集。而将表进行truncate后CDC是不会有任何结果的。
总结
本文首先部署了一个docker mysql服务并且开启binlog然后通过flink cdc 将mysql binlog日志抽取并打印出来。
flink cdc 目前版本为2.3.0对于mysql比较稳定的支持后面我们会在生产上进行测试并尝试满足业务需求。