Flink-CDC——MySQL、SqlSqlServer、Oracle等数据库开启日志方法-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

文章目录

1. 前言

2. 数据源安装与配置

2.1 MySQL

2.1.1 安装

2.1.2 CDC 配置

2.2 Postgresql

2.2.1 安装

2.2.2 CDC 配置

2.3 Oracle

2.3.1 安装

2.3.2 CDC 配置

2.4 SQLServer

2.4.1 安装

2.4.2 CDC 配置

3. 验证

3.1 Flink版本与CDC版本的对应关系

3.2 下载相关包

3.3 添加cdc jar 至lib目录

3.4 验证

本文目录结构

|___ 1. 前言

|___ 2. 数据源安装与配置

|______ 2.1 MySQL

|_________ 2.1.1 安装

|_________ 2.1.2 CDC 配置

|______ 2.2 Postgresql

|_________ 2.2.1 安装

|_________ 2.2.2 CDC 配置

|______ 2.3 Oracle

|_________2.3.1 安装

|_________2.3.2 CDC 配置

|_______2.4 SQLServer

|_________2.4.1 安装

|_________2.4.2 CDC 配置

|___ 3. 验证

|_______3.1 Flink版本与CDC版本的对应关系

|_______3.2 下载相关包

|_______3.3 添加cdc jar 至lib目录

|_______3.4 验证

1. 前言

关于如何使用和配置flink cdc功能其实在官方文档https://ververica.github.io/flink-cdc-connectors/master/有相关的教程了

本文主要就是记录在docker下安装和配置各种数据源以实现flink cdc的功能包含如下常见的数据源

数据源    版本

MySQL   8.0.25

Postgresql     10.6

Oracle    11g

SqlServer       2019

2. 数据源安装与配置

2.1 MySQL

版本8.0.25

2.1.1 安装

Step1: 拉取mysql镜像

docker pull mysql:8.0.25

Step2: 创建并运行 MySQL 容器

docker run -d -p 30025:3306 --name mysql8.0.25 -e MYSQL_ROOT_PASSWORD=root mysql:8.0.25

2.1.2 CDC 配置

Step1进入正在运行的mysql容器

docker exec -it mysql8.0.25 mysql -uroot -proot

Step2配置 CDC

-- 启用二进制日志

mysql> SET GLOBAL log_bin = ON;

-- 设置二进制日志格式为行级别

mysql> SET GLOBAL binlog_format = 'ROW';

Step3非必要如果配置没生效重启容器

docker restart mysql8.0.25

2.2 Postgresql

版本PostgreSQL 10.6 (Debian 10.6-1.pgdg90+1)

2.2.1 安装

Step1 拉取 PostgreSQL 10.6 版本的镜像

docker pull postgres:10.6

Step2创建并启动 PostgreSQL 容器在这里我们将把容器的端口 5432 映射到主机的端口 30028账号密码设置为postgres并将 pgoutput 插件加载到 PostgreSQL 实例中

docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'

Step3 查看容器是否创建成功

docker ps | grep postgres-10.6

2.2.2 CDC 配置

Step1docker进去Postgresql数据的容器

docker exec -it postgres-10.6  bash

Step2编辑postgresql.conf配置文件

vi /var/lib/postgresql/data/postgresql.conf

配置内容如下

# 更改wal日志方式为logical方式有minimal、replica 、logical 

wal_level = logical 

# 更改solts最大数量默认值为10flink-cdc默认一张表占用一个slots

max_replication_slots = 20

# 更改wal发送最大进程数默认值为10这个值和上面的solts设置一样

max_wal_senders = 20    

# 中断那些停止活动超过指定毫秒数的复制连接可以适当设置大一点默认60s0表示禁用

wal_sender_timeout = 180s          

Step3重启容器

docker restart postgres-10.6

连接数据库如果查询一下语句返回logical表示修改成功

SHOW wal_level;

Step4新建用户并赋权。使用创建容器时的账号密码postgres/postgres登录Postgresql数据库。

-- 创建数据库 test_db

CREATE DATABASE test_db;

-- 连接到新创建的数据库 test_db

\c test_db

-- 创建 t_user 表

CREATE TABLE "public"."t_user" (

    "id" int8 NOT NULL,

    "name" varchar(255),

    "age" int2,

    PRIMARY KEY ("id")

);

-- pg新建用户

CREATE USER test1 WITH PASSWORD 'test123';

-- 给用户复制流权限

ALTER ROLE test1 replication;

-- 给用户登录数据库权限

GRANT CONNECT ON DATABASE test_db to test1;

-- 把当前库public下所有表查询权限赋给用户

GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

Step4发布表

-- 设置发布为true

update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布

CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布

select * from pg_publication_tables;

-- 更改复制标识包含更新和删除之前值目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句那么 t_user 表的复制标识可能默认为 NOTHING这可能导致实时同步时丢失更新和删除的数据行信息从而影响同步的准确性

ALTER TABLE t_user REPLICA IDENTITY FULL;

-- 查看复制标识为f标识说明设置成功f表示 full否则为 n表示 nothing即复制标识未设置

select relreplident from pg_class where relname='t_user';

2.3 Oracle

版本Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

2.3.1 安装

Step1拉取 oracle 11g 镜像有6g要等较长的时间

docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

Step2执行以下命令以创建并运行 Oracle 11g 容器

docker run -d -p 30026:1521 -p 8081:8080 \

--name oracle_11g \

-e ORACLE_HOME=/home/oracle/app/oracle/product/11.2.0/dbhome_2 \

-e ORACLE_SID=helowin \

registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

Step3查看容器是否启动

docker ps -a|grep oracle_11g

Step4进入容器

docker exec -it oracle_11g bash

**Step5**设置账号密码

# 1. 切换至root用户(默认是oracle用户)密码为helowin

su root

# 2. 创建软链接

ln -s $ORACLE_HOME/bin/sqlplus /usr/bin

# 3.切换回oracle用户

su oracle

# 4. 登录sql plus

sqlplus /nolog

conn /as sysdba

## 4.1 修改system用户密码为system

alter user system identified by system;

## 4.2 修改sys用户密码为system

alter user sys identified by system;

## 4.3 新增一个测试用户用户名test密码test123;

create user test identified by test123;

## 4.4 将dba权限给内部管理员账号和密码

grant connect,resource,dba to test;

## 4.5 修改密码策略规则为密码永不过期

ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED;

## 4.6 修改数据库最大连接数

alter system set processes=1000 scope=spfile;

## 4.7 最后重启数据库

shutdown immediate;

startup;

# 5.退出

exit

2.3.2 CDC 配置

Step1进入容器

docker exec -it oracle_11g bash

Step2以DBA的权限登录数据库

sqlplus /nolog

CONNECT sys/system AS SYSDBA

Step3启用日志归档

-- 设置数据库恢复文件目标大小为10G

alter system set db_recovery_file_dest_size = 10G;

-- 设置数据库恢复文件目标路径

alter system set db_recovery_file_dest = '/home/oracle/app/oracle/product/11.2.0' scope=spfile;

-- 立即关闭数据库

shutdown immediate;

-- 以mount模式启动数据库

startup mount;

-- 启用数据库归档日志模式

alter database archivelog;

-- 打开数据库允许用户访问

alter database open;

Step4查看日志归档是否启用如果显示“Archive Mode”表示已经启用

archive log list;

Step5创建表空间

-- 以DBA的权限登录数据库

sqlplus /nolog

CONNECT sys/system AS SYSDBA

-- 创建一个名为"logminer_tbs"的表空间

-- 指定表空间的数据文件路径为"/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf"其中"/home/oracle/app/oracle/product/11.2.0"是数据文件存储的目录"logminer_tbs.dbf"是数据文件的文件名

-- 设置表空间的初始大小为25MB

-- 如果数据文件已经存在且可重用将其重用否则创建一个新的数据文件

-- 启用表空间的自动扩展功能即当表空间空间不足时自动增加数据文件的大小

-- 设置表空间的最大允许大小为无限即表空间可以无限制地自动扩展

CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

Step6创建用户并赋权

-- 创建一个名为"flinkuser"的用户密码为"flinkpw"将其默认表空间设置为"LOGMINER_TBS"并在该表空间上设置无限配额。

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

-- 允许"flinkuser"用户创建会话即允许该用户连接到数据库。

GRANT CREATE SESSION TO flinkuser;

-- 不支持Oracle 11g允许"flinkuser"用户在多租户数据库CDB中设置容器。

-- GRANT SET CONTAINER TO flinkuser;

-- 允许"flinkuser"用户查询V_$DATABASE视图该视图包含有关数据库实例的信息。

GRANT SELECT ON V_$DATABASE TO flinkuser;

-- 允许"flinkuser"用户执行任何表的闪回操作。

GRANT FLASHBACK ANY TABLE TO flinkuser;

-- 允许"flinkuser"用户查询任何表的数据。

GRANT SELECT ANY TABLE TO flinkuser;

-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色该角色允许查询数据字典和元数据。

GRANT SELECT_CATALOG_ROLE TO flinkuser;

-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色该角色允许执行一些数据字典中的过程和函数。

GRANT EXECUTE_CATALOG_ROLE TO flinkuser;

-- 允许"flinkuser"用户查询任何事务。

GRANT SELECT ANY TRANSACTION TO flinkuser;

-- 不支持Oracle 11g允许"flinkuser"用户进行数据变更追踪LogMiner。

-- GRANT LOGMINING TO flinkuser;

-- 允许"flinkuser"用户创建表。

GRANT CREATE TABLE TO flinkuser;

-- 允许"flinkuser"用户锁定任何表。

GRANT LOCK ANY TABLE TO flinkuser;

-- 允许"flinkuser"用户修改任何表。

GRANT ALTER ANY TABLE TO flinkuser;

-- 允许"flinkuser"用户创建序列。

GRANT CREATE SEQUENCE TO flinkuser;

-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;

-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。

GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

-- 允许"flinkuser"用户查询V_$LOG视图该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V_$LOG TO flinkuser;

-- 允许"flinkuser"用户查询V_$LOG_HISTORY视图该视图包含有关数据库历史日志文件的信息。

GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;

-- 允许"flinkuser"用户查询V_$LOGMNR_LOGS视图该视图包含有关LogMiner日志文件的信息。

GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;

-- 允许"flinkuser"用户查询V_$LOGMNR_CONTENTS视图该视图包含LogMiner日志文件的内容。

GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;

-- 允许"flinkuser"用户查询V_$LOGMNR_PARAMETERS视图该视图包含有关LogMiner的参数信息。

GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;

-- 允许"flinkuser"用户查询V_$LOGFILE视图该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V_$LOGFILE TO flinkuser;

-- 允许"flinkuser"用户查询V_$ARCHIVED_LOG视图该视图包含已归档的数据库日志文件的信息。

GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;

-- 允许"flinkuser"用户查询V_$ARCHIVE_DEST_STATUS视图该视图包含有关归档目标状态的信息。

GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

Step7数据库和表启用增量日志

-- 切换至flinkuser用户

sqlplus /nolog

CONNECT flinkuser/flinkpw

-- 创建customers表

CREATE TABLE customers (

    customer_id NUMBER PRIMARY KEY,

    customer_name VARCHAR2(50),

    email VARCHAR2(100),

    phone VARCHAR2(20)

) TABLESPACE LOGMINER_TBS;

-- 查看LOGMINER_TBS表空间下的所有表

select tablespace_name, table_name from user_tables

where tablespace_name = 'LOGMINER_TBS';

-- 以DBA的权限登录数据库

sqlplus /nolog

CONNECT sys/system AS SYSDBA

-- 为LOGMINER_TBS表空间下的customers表启用增强日志记录

ALTER TABLE FLINKUSER.CUSTOMERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

-- 为数据库启用增强日志记录

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

2.4 SQLServer

版本Microsoft SQL Server 2019 (RTM-CU21) (KB5025808) - 15.0.4316.3 (X64)

2.4.1 安装

Step1拉取SQL Server 2019 镜像

docker pull mcr.microsoft.com/mssql/server:2019-latest

Step2运行 SQL Server 容器密码必须是8个字符并包含字母、数字和特殊字符如abc@123456 下面映射主机端口为30027

docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest

Step3验证 SQL Server 容器是否正在运行

docker ps -a|grep sql_server_2019

2.4.2 CDC 配置

Step1开启SQLServer代理

## 使用root用户登录容器

docker exec -it --user root sql_server_2019 bash

## 进入容器后执行命令启用Agent

/opt/mssql/bin/mssql-conf set sqlagent.enabled true

## 退出重启容器

exit

docker restart sql_server_2019

Step2创建’cdc_test’测试数据库并使用连接工具登录该数据库使用以下 SQL 命令启用 CDC 功能

-- 创建数据库

CREATE DATABASE cdc_test;

-- 启用CDC功能

EXEC sys.sp_cdc_enable_db;

-- 判断当前数据库是否启用了CDC如果返回1表示已启用

SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';

Step3选择要进行 CDC 跟踪的表这里使用orders表作为演示

-- 创建示例表orders

CREATE TABLE orders (

     id int,

     order_date date,

     purchaser int,

     quantity int,

     product_id int,

     PRIMARY KEY ([id])

);

--将下面四行sql代码执行使数据表开启CDC

-- schema_name 是表所属的模式schema的名称。

-- source_name 是要启用 CDC 跟踪的表的名称。

-- role_name 是 CDC 使用的角色的名称。

EXEC sys.sp_cdc_enable_table

  @source_schema = 'dbo',

  @source_name   = 'tablename',

  @role_name     = 'cdc_role';

-- 判断当前数据表是否启用了CDC如果返回1表示已启用

select is_tracked_by_cdc from sys.tables where name = 'tablename';

--关闭表CDC

EXEC sys.sp_cdc_disable_table
  @source_schema = 'dbo',
  @source_name   = 'tablename',
    @capture_instance = 'dbo_tablename';

3. 验证

如果要验证flink cdc的功能需要先下载flink的安装包然后下载相应的cdc jar包并依赖最后使用安装包里面的sql-client写相关的flink sql即可验证。

3.1 Flink版本与CDC版本的对应关系

下载Flink安装包以及jar包前必须确定Flink CDC与Flink版本关系

Flink CDC 版本     Flink 版本

1.0.0       1.11.*

1.1.0       1.11.*

1.2.0       1.12.*

1.3.0       1.12.*

1.4.0       1.13.*

2.0.* 1.13.*

2.1.* 1.13.*

2.2.* 1.13.*, 1.14.*

2.3.* 1.13.*, 1.14.*, 1.15.*, 1.16.0

2.4.* 1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.0

本文以 Flink1.13.6 + Flink CDC 2.2.0 版本为例子演示。

3.2 下载相关包

flink 安装包下载下载地址https://flink.apache.org/downloads/

下载cdc相关的jar根据自己的需求下载相关的cdc jarhttps://repo1.maven.org/maven2/com/ververica/

3.3 添加cdc jar 至lib目录

把需要验证的cdc jar放到flink安装包解压之后的lib目录<FLINK_HOME>/lib/:

3.4 验证

使用下面的命令启动 Flink 集群

./bin/start-cluster.sh

启动成功可以访问 http://localhost:8081 访问到 Flink Web UI

使用下面的命令启动 Flink SQL CLI

./bin/sql-client.sh

展示如下页面表示启动flink客户端成功

执行如下FlinkSQL

CREATE TABLE t_source_sqlserver (

   id INT,

    order_date DATE,

    purchaser INT,

    quantity INT,

    product_id INT,

    PRIMARY KEY (id) NOT ENFORCED

) WITH (

  'connector' = 'sqlserver-cdc',

  'hostname' = '10.194.183.120',

  'port' = '30027',

  'username' = 'sa',

  'password' = 'abc@123456',

  'database-name' = 'cdc_test',

  'schema-name' = 'dbo',

  'table-name' = 'orders'

);

可以看到执行成功了

执行select 语句以便实时查看该表的数据变动

select * from t_source_sqlserver;

从下图可以看出只要修改左边的数据会在控制台实时显示新增删除的数据。

同时也能在Flink web页面看到任务正在运行

最后可以通过如下命令关闭掉Flink启动的集群

./stop-cluster.sh

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6