第一章 Flink简介_flink
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
Flink 系列教程传送门
第四章 Flink 窗口和水位线
第五章 Flink Table API&SQL
第六章 新闻热搜实时分析系统
前言
流计算产品实时性有两个非常重要的实时性设计因素一个是待计算的数据一个是计算的时钟。低延时要求流计算框架尽可能早的输出计算结果但是由于存在数据延时和现实业务数据更新的客观情况就会导致你前一秒计算的结果因为下一秒来了一个对上一秒已经参与计算的那条数据的更新进而导致在下一秒时候上一秒的计算结果就是无效的了那么流计算产品低延时需求导致流计算产品不可能无限制的等待延时数据的到来这就一定会造成数据计算结果不精准的问题。如果流计算产品想让自己的计算结果更准确那就需要忍受对延时数据进行更长时间的等待那就意味着流计算产品的低延时无法达成所以在流计算产品中鱼和熊掌兼得是不那么容易的。
一、Flink概述
在德语中Flink 一词表示快速和灵巧项目采用一只松鼠的彩色图案作为 logo。
Apache Flink是Apache软件基金会的一个顶级项目是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架并且可以同时支持实时计算和批量计算。
Flink起源于Stratosphere 项目该项目是在2010年到2014年间由柏林工业大学、柏林洪堡大学和哈索普拉特纳研究所联合开展的开始是做批处理后面转向了流处理。
- 2014年4月Stratosphere代码被贡献给Apache软件基金会并改名为Flink成为Apache软件基金会孵化器项目并开始在开源大数据行业内崭露头角。
- 2014年8月团队的大部分创始成员离开大学共同创办了一家名为Data Artisans的公司。
- 2015年4月Flink发布了里程碑式的重要版本0.9.0。
- 2019年1月长期对Flink投入研发的阿里巴巴以9000万欧元的价格收购了Data Artiscans公司。
- 2019年8月阿里巴巴将内部版本Blink开源合并入Flink1.9.0版本。
目前最新版本Flink为1.16.0版本本系列课程我们采用Flink1.14.5进行讲解。
二、Flink编程模型
在自然环境中数据的产生原本就是流式的。无论是来自 Web 服务器的事件数据证券交易所的交易数据还是来自工厂车间机器上的传感器数据其数据都是流式的。但是当你分析数据时可以围绕 有界流bounded或 无界流unbounded两种模型来组织处理数据当然选择不同的模型程序的执行和处理方式也都会不同。
- 批处理是有界数据流处理的范例。在这种模式下你可以选择在计算结果输出之前输入整个数据集这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。
- 流处理正相反其涉及无界数据流。至少理论上来说它的数据输入永远不会结束因此程序必须持续不断地对到达的数据进行处理。
三、程序结构
在Hadoop中实现一个MapReduce应用程序需要编写Map和Reduce两部分实现一个Flink应用程序也需要同样的逻辑。一个Flink应用程序由3部分构成或者说将Flink的操作算子可以分成3部分分别为Source、Transformation和Sink如图
- 数据源Flink 在流处理和批处理上的数据源大概有4类基于本地集合的数据源(
fromCollection
、fromElements
)、基于文件的数据源(readTextFile
)、基于网络套接字的数据源(socketTextStream
)、自定义的数据源(KafkaSource
)。常见的自定义数据源包括Kafka、RabbitMQ、NiFi等。 - 数据转换数据转换的各种操作包括map、 flatMap、filter、keyBy、reduce、aggregation、window、union、select等可以将原始数据转换成满足要求的数据。
- 数据输出数据输出是指Flink将转换计算后的数据发送的目的地。常见的数据输出包括写入文件、打印到屏幕、写入Socket 、自定义Sink等 。
在 Flink 中应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图以一个或多个源source开始并以一个或多个汇sink结束。
四、总图概览
Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源例如 Apache Kafka 或 Kinesis的实时数据也可以从各种的数据源中消费有界的历史数据。同样Flink 应用程序生成的结果流也可以发送到各种数据汇中。
在 Flink 中应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图以一个或多个源source开始并以一个或多个汇sink结束。
从代码到逻辑视图。逻辑视图中圆圈表示算子箭头表示数据流可以在Flink Web UI中查看一个作业的逻辑视图大数据框架的算子对计算做了抽象方便用户进行并行计算、横向扩展和故障恢复。
通常程序代码中的 transformation 和 dataflow 中的算子operator之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况
五、入门案例
1、安装Maven整合IDEA开发工具
Maven 是一款基于 Java 平台的项目管理和整合工具它将项目的开发和管理过程抽象成一个项目对象模型POM。开发人员只需要做一些简单的配置Maven 就可以自动完成项目的编译、测试、打包、发布以及部署等工作。
约定优于配置Convention Over Configuration是 Maven 最核心的涉及理念之一 Maven对项目的目录结构、测试用例命名方式等内容都做了规定凡是使用 Maven 管理的项目都必须遵守这些规则。
Maven 项目构建过程中会自动创建默认项目结构开发人员仅需要在相应目录结构下放置相应的文件即可。
官方下载地址下载完成后解压到合适的位置即可建议放在D:/devtools
目录下。
2、修改Maven的下载源地址和本地仓库地址
修改Maven安装目录下conf/settings.xml文件具体修改项如下
<localRepository>D:/devtools/apache-maven-3.6.1/localRepository</localRepository>
<mirrors>
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>*</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
<mirror>
<id>nexus-osc</id>
<mirrorOf>*</mirrorOf>
<name>Nexus osc</name>
<url>http://mirrors.163.com/maven/repository/maven-central/</url>
</mirror>
</mirrors>
<profiles>
<profile>
<id>jdk-1.8</id>
<activation>
<!--这个字段表示默认激活-->
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
</properties>
</profile>
</profiles>
3、IDEA整合Maven
在IDEA的设置中搜索maven做如下修改选择本地安装的Maven相关选项。
4、使用Flink实现批计算
使用Flink Scala完成批处理的词频统计案例具体处理流程如下
在pom.xml中添加flink所需依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<!--No ExecutorFactory found to execute the application. 从 flink1.11.0 版本开始需要多引入一个 flink-client 包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.5</version>
</dependency>
详细代码示例如下
import org.apache.flink.api.scala._
object WordCountBatchTest {
def main(args: Array[String]): Unit = {
// 创建Flink的执行环境(批处理的)
val env = ExecutionEnvironment.getExecutionEnvironment
// Source 读取数据源
val data = env.readTextFile("datasource/word.txt")
// Transformation 转换 计算
val result = data
.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.groupBy(0)
.sum(1)
// Sink 把转换的结果输出
result.print()
}
}
从 flink1.11.0 版本开始需要多引入一个 flink-client 包
5、使用Flink实现流计算
Flink流计算会借助NetCat工具进行流式数据进行数据录入具体安装使用如下
Netcat官网下载地址下载netcat-win32-1.12.zip
压缩包解压到安装目录并配置PATH
环境变量。
- 输入
nc -l -p 9000 -v
监控9000端口接收数据 - 输入
nc localhost 9000
进行连接并发送数据
在cmd中输入命令nc -l -p 666
监控666端口并输入测试数据
在pom.xml中添加flink流处理所需依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.14.5</version>
<!--<scope>provided</scope>-->
</dependency>
使用Flink Scala编写流式数据处理程序
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 获取Flink流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 数据源-监控Netcat数据端口666
val data = env.socketTextStream("localhost", 666)
// 数据转换
val result = data
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_,1))
.keyBy(_._1)
.sum(1)
// Sink 数据输出到控制台
result.print()
// 流处理环境执行
env.execute()
}
}