JDBC SQL Server Source Connector: 一览与实践-CSDN博客
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
在快速发展的数据驱动业务环境中确保数据在各个系统间高效、准确地同步至关重要。为了进一步的数据处理和分析经常需要将这些数据同步到其他数据处理系统。Apache SeaTunnel 提供了一个强大而灵活的数据集成框架使得从 SQL Server 到其他系统的数据同步变得简单且高效。
本文档将指导您如何配置 Apache SeaTunnel使用 JDBC SQL Server Source Connector 来实现数据的有效同步。
JDBC SQL Server Source Connector
支持 SQL Server 版本
-
服务器2008或更高版本仅供信息参考
支持以下引擎
Spark
Flink
Seatunnel Zeta
主要特点
支持查询 SQL 并能够实现投影效果。
描述
通过 JDBC 读取外部数据源数据。
支持的数据源信息
数据源 | 支持的版本 | 驱动 | URL | Maven |
---|---|---|---|---|
SQL Server | 支持版本 >= 2008 | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | 下载 |
数据库依赖
请下载与 'Maven' 对应的支持列表并将其复制到 '数据类型映射
SQL Server 数据类型 Seatunnel 数据类型 BIT BOOLEAN TINYINT
SMALLINTSHORT INTEGER INT BIGINT LONG DECIMAL
NUMERIC
MONEY
SMALLMONEYDECIMAL((指定列的指定列大小)+1,
(获取指定列的小数点右边的数字的数量。)))REAL FLOAT FLOAT DOUBLE CHAR
NCHAR
VARCHAR
NTEXT
NVARCHAR
TEXTSTRING DATE LOCAL_DATE TIME LOCAL_TIME DATETIME
DATETIME2
SMALLDATETIME
DATETIMEOFFSETLOCAL_DATE_TIME TIMESTAMP
BINARY
VARBINARY
IMAGE
UNKNOWN尚不支持 源选项
名称 类型 必需 默认值 描述 url 字符串 是 - JDBC 连接的 URL。例如jdbc:sqlserver://127.0.0.1:1434;database=TestDB driver 字符串 是 - 用于连接到远程数据源的 JDBC 类名如果使用 SQL Server则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
。user 字符串 否 - 连接实例的用户名 password 字符串 否 - 连接实例的密码 query 字符串 是 - 查询语句 connection_check_timeout_sec 整数 否 30 等待用于验证连接的数据库操作完成的秒数 partition_column 字符串 否 - 并行处理的分区列仅支持数值类型。 partition_lower_bound 长整数 否 - 用于扫描的 partition_column 最小值如果未设置SeaTunnel 将查询数据库获取最小值。 partition_upper_bound 长整数 否 - 用于扫描的 partition_column 最大值如果未设置SeaTunnel 将查询数据库获取最大值。 partition_num 整数 否 作业并行度 分区计数的数量仅支持正整数。默认值为作业并行度。 fetch_size 整数 否 0 对返回大量对象的查询您可以配置查询中使用的行抓取大小以减少满足选择条件所需的数据库命中次数从而提高性能。
零表示使用 JDBC 默认值。common-options 否 - 源插件的常见参数请参阅 源常用选项 以获取详细信息。 提示
如果未设置 partition_column则将以单一并发运行如果设置了 partition_column则将根据任务的并发度进行并行执行。
任务示例
简单
简单的单一任务以读取数据表
# 定义运行时环境
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 1
job.mode = "BATCH"
}
source{
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "select * from full_types_jdbc"
}
}
transform {
# 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息
# 请转到 [seatunnel.apache.org/docs/transform-v2/sql](https://seatunnel.apache.org/docs/transform-v2/sql)
}
sink {
Console {}
}并行
使用您配置的分片字段和分片数据并行读取您的查询表如果您希望读取整个表可以这样做
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
# 根据需要定义查询逻辑
query = "select * from full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}并行
使用您配置的分片字段和分片数据并行读取您的查询表如果您希望读取整个表可以这样做
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
# 根据需要定义查询逻辑
query = "select * from full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}并行
使用您配置的分片字段和分片数据并行读取您的查询表如果您希望读取整个表可以这样做
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
# 根据需要定义查询逻辑
query = "select * from full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}并行
使用您配置的分片字段和分片数据并行读取您的查询表如果您希望读取整个表可以这样做
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
# 根据需要定义查询逻辑
query = "select * from full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}分段并行读取示例
这是一个快速并行读取数据的分片示例
env {
# 您可以在此处设置引擎配置
execution.parallelism = 10
}
source {
# 这是一个示例源插件仅用于测试和展示源插件的功能
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "select * from column_type_test.dbo.full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
# 如果您想要获取有关如何配置 Seatunnel 和查看源插件的完整列表的更多信息
# 请转到 https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
# 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息
# 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |