Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)-CSDN博客
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
世间真正温煦的春色都熨帖着大地潜伏在深谷
文章目录
1、输出算子Sink
Flink作为数据处理框架最终还是要把计算处理的结果写入外部储存为外部应用提供支持。
1.1 连接到外部系统
Flink的DataStream API专门提供了向外部提供写入数据的方法addSink。与addSource类似addSink方法对应着一个“Sink”算子主要就是用来实现与外部系统连接、并将数据提交写入的。Flink程序中所有对外的输出操作一般都是利用Sink算子完成的。
Flink1.12以前Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke()用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始同样重构了Sink架构
stream.sinkTo(…)
当然Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示列出了Flink官方目前支持的第三方系统连接器
我们可以看到像Kafka之类流式系统Flink提供了完美对接source/sink两端都能连接可读可写而对于Elasticsearch、JDBC等数据存储系统则只提供了输出写入的sink连接器。
除Flink官方之外Apache Bahir框架也实现了一些其他第三方系统与Flink的连接器。
除此以外就需要用户自定义实现sink连接器了。
1.2 输出到文件
Flink专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的Sink它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder可以直接调用FileSink的静态方法
- 行编码 FileSink.forRowFormatbasePathrowEncoder。
- 批量编码 FileSink.forBulkFormatbasePathbulkWriterFactory。
public class SinkFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每个目录中都有 并行度个数的 文件在写入
env.setParallelism(2);
// 必须开启checkpoint否则一直都是 .inprogress
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1000),
Types.STRING
);
DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
// 输出到文件系统
FileSink<String> fieSink = FileSink
// 输出行式存储的文件指定路径、指定编码
.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
// 输出文件的一些配置 文件名的前缀、后缀
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("atguigu-")
.withPartSuffix(".log")
.build()
)
// 按照目录分桶如下就是每个小时一个目录
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
// 文件滚动策略: 1分钟 或 1m
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(1))
.withMaxPartSize(new MemorySize(1024*1024))
.build()
)
.build();
dataGen.sinkTo(fieSink);
env.execute();
}
}
1.3 输出到Kafka
1添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据连接器相关依赖已经引入这里就不重复介绍了。
2启动Kafka集群
3编写输出到Kafka的示例代码
输出无key的record:
public class SinkKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 如果是精准一次必须开启checkpoint后续章节介绍
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
SingleOutputStreamOperator<String> sensorDS = env
.socketTextStream("hadoop102", 7777);
/**
* Kafka Sink:
* TODO 注意如果要使用 精准一次 写入Kafka需要满足以下条件缺一不可
* 1、开启checkpoint后续介绍
* 2、设置事务前缀
* 3、设置事务超时时间 checkpoint间隔 < 事务超时时间 < max的15分钟
*/
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// 指定 kafka 的地址和端口
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
// 指定序列化器指定Topic名称、具体的序列化
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("ws")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 写到kafka的一致性级别 精准一次、至少一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次必须设置 事务的前缀
.setTransactionalIdPrefix("atguigu-")
// 如果是精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
.build();
sensorDS.sinkTo(kafkaSink);
env.execute();
}
}
自定义序列化器实现带key的record:
public class SinkKafkaWithKey {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
SingleOutputStreamOperator<String> sensorDS = env
.socketTextStream("hadoop102", 7777);
/**
* 如果要指定写入kafka的key可以自定义序列化器
* 1、实现 一个接口重写 序列化 方法
* 2、指定key转成 字节数组
* 3、指定value转成 字节数组
* 4、返回一个 ProducerRecord对象把key、value放进去
*/
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
.setRecordSerializer(
new KafkaRecordSerializationSchema<String>() {
@Nullable
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
String[] datas = element.split(",");
byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>("ws", key, value);
}
}
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("atguigu-")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();
sensorDS.sinkTo(kafkaSink);
env.execute();
}
}
运行代码在Linux主机启动一个消费者查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
1.4 输出到MySQLJDBC
1添加依赖
<!--mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
2启动MySQL在test库下建表
CREATE TABLE `ws` (
`id` varchar(100) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
3输出到MySQL的示例代码
public class SinkMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());
/**
* TODO 写入mysql
* 1、只能用老的sink写法 addsink
* 2、JDBCSink的4个参数:
* 第一个参数 执行的sql一般就是 insert into
* 第二个参数 预编译sql 对占位符填充值
* 第三个参数 执行选项 ---》 攒批、重试
* 第四个参数 连接选项 ---》 url、用户名、密码
*/
SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
"insert into ws values(?,?,?)",
new JdbcStatementBuilder<WaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
//每收到一条WaterSensor如何去填充占位符
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3) // 重试次数
.withBatchSize(100) // 批次的大小条数
.withBatchIntervalMs(3000) // 批次的时间
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("000000")
.withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
.build()
);
sensorDS.addSink(jdbcSink);
env.execute();
}
}
4运行代码用客户端连接MySQL查看是否成功写入数据。
1.4 自定义Sink输出
如果我们想将数据存储到我们自己的存储设备中而Flink并没有提供可以直接使用的连接器就只能自定义Sink进行输出了。与Source类似Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类只要实现它通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction<String>());
在实现SinkFunction的时候需要重写的一个关键方法invoke()在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用对于任何外部存储系统都有效不过自定义Sink想要实现状态一致性并不容易所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现而且在不断地扩充因此自定义的场景并不常见。
您的支持是我创作的无限动力
希望我能为您的未来尽绵薄之力
如有错误谢谢指正若有收获谢谢赞美
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |