SparkStreaming 操作
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
SparkStreaming 操作
一、WordCount
- 安装nc,用来向端口发送数据
yum install -y nc
- 启动客户端发送数据
nc -lk 9999
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
// 按时间间隔将流数据进行划分
// Seconds : 秒
val data = new StreamingContext(sc, Seconds(5))
val inputDStream = data.socketTextStream("hadoop102", 9999)
val resultDStream = inputDStream
.filter(StringUtils.isNotBlank(_))
.flatMap(_.trim.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
// 打印前十个元素
resultDStream.print(10)
// 启动并等待程序停止
data.start()
// 应用启动之后应该一直运行除非人为终止或程序异常
data.awaitTermination()
// 关闭流式应用
data.stop(true, true)
}
}
二、在一的基础上能够对历史数据进行累加
updateStateByKey
该操作可以保存历史状态同时利用新操作来更新数据
需要执行以下两个步骤
- 定义状态 - 状态可以是任意数据类型。
- 定义状态更新函数 - 使用函数指定如何使用 以前的状态和输入流中的新值。
在每个批处理中Spark 对所有的key进行更新无论他是否有变化
使用该操作需要是指checkpoint目录开启checkpoint方便异常恢复数据
// 历史状态需要有地方存储
// 设置checkpoint目录
ssc.checkpoint("data/ckp")
// 参数1Seq[Int]:当前批次的数据如果发送了两个sparkkey为sparkSeq[1,1]
// 参数2Option[Int]:上一次该key的历史值如果没有就为0有则取出来
// 返回值Option[Int]当前批次的值+历史值
val updateFunc = (currentValues: Seq[Int], historyValue: Option[Int]) => {
// 当前批次的数据和历史数据进行合并得到结果
if (currentValues.size > 0) {
val newValue: Int = currentValues.sum + historyValue.getOrElse(0) // getOrElse为默认值
Option(newValue)
} else {
historyValue
}
}
val result = data
.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey(updateFunc)
mapWithState
该操作和updateStateByKey的区别是只关心变化的key
//第一个参数是key第二参数是当前value,第三个参数之前的value
val mappingFunc = (word: String, current: Option[Int], state: State[Int]) => {
val newCount = current.getOrElse(0) + state.getOption().getOrElse(0)
val output = (word, newCount)
state.update(newCount)
output
}
val result2 = data
.flatMap(_.split(" "))
.map((_, 1))
.mapWithState(StateSpec.function(mappingFunc))
代码实现
object UpdateStateByKey {
def main(args: Array[String]): Unit = {
// 对从socket接收的数据做wordcount并能和历史数据及逆行累加
// updateStateByKey会返回没有变化的数据数据量太大需要checkpoint
// mapWithState只关注变化
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpdateStateByKey")
val sc = new SparkContext(conf)
// 数据流被划分为批的时间间隔
val ssc = new StreamingContext(sc, Seconds(5))
// 历史状态需要有地方存储
// 设置checkpoint目录
ssc.checkpoint("data/ckp")
// 接收socket数据
val data = ssc.socketTextStream("hadoop102", 9999)
// wordcount
// 参数1Seq[Int]:当前批次的数据如果发送了两个sparkkey为sparkSeq[1,1]
// 参数2Option[Int]:上一次该key的历史值如果没有就为0有则取出来
// 返回值Option[Int]当前批次的值+历史值
val updateFunc = (currentValues: Seq[Int], historyValue: Option[Int]) => {
// 当前批次的数据和历史数据进行合并得到结果
if (currentValues.size > 0) {
val newValue: Int = currentValues.sum + historyValue.getOrElse(0) // getOrElse为默认值
Option(newValue)
} else {
historyValue
}
}
val result = data
.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey(updateFunc)
//=================================mapWithState========================
val mappingFunc = (word: String, current: Option[Int], state: State[Int]) => {
val newCount = current.getOrElse(0) + state.getOption().getOrElse(0)
val output = (word, newCount)
state.update(newCount)
output
}
val result2 = data
.flatMap(_.split(" "))
.map((_, 1))
.mapWithState(StateSpec.function(mappingFunc))
result.print()
result2.print()
ssc.start()
ssc.awaitTermination()
ssc.stop(true, true)
}
}
在三的基础上实现程序停止之后再启动接着上次的结果进行累加
// 从检查点数据重新创建流上下文或创建新的流上下文。
// 如果提供的检查点路径中存在检查点数据
// 则 StreamingContext 将是 从检查点数据重新创建。
// 如果数据不存在则提供的 setupFunc 将用于创建新上下文。
val ssc : StreamingContext = StreamingContext.getOrCreate(dir, createStreamingContextFunction _)
完整代码
// 程序停止之后还能接着上次的结果进行累加
object StateRecovery {
def createStreamingContextFunction : StreamingContext = {
// 对从socket接收的数据做wordcount并能和历史数据及逆行累加
// updateStateByKey会返回没有变化的数据数据量太大需要checkpoint
// mapWithState只关注变化
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpdateStateByKey")
val sc = new SparkContext(conf)
// 数据流被划分为批的时间间隔
val ssc = new StreamingContext(sc, Seconds(5))
// 历史状态需要有地方存储
// 设置checkpoint目录
ssc.checkpoint("data/c")
// 接收socket数据
val data = ssc.socketTextStream("hadoop102", 9999)
// wordcount
// 参数1Seq[Int]:当前批次的数据如果发送了两个sparkkey为sparkSeq[1,1]
// 参数2Option[Int]:上一次该key的历史值如果没有就为0有则取出来
// 返回值Option[Int]当前批次的值+历史值
val updateFunc = (currentValues: Seq[Int], historyValue: Option[Int]) => {
// 当前批次的数据和历史数据进行合并得到结果
if (currentValues.size > 0) {
val newValue: Int = currentValues.sum + historyValue.getOrElse(0) // getOrElse为默认值
Option(newValue)
} else {
historyValue
}
}
val result = data
.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey(updateFunc)
result.print()
ssc
}
def main(args: Array[String]): Unit = {
val dir = "data/c"
val ssc : StreamingContext = StreamingContext.getOrCreate(dir, createStreamingContextFunction _)
val sc = ssc.sparkContext
ssc.start()
ssc.awaitTermination()
ssc.stop(true, true)
}
}
窗口函数
要求
每隔5s计算近10s的数据
- 窗口长度 10 s > 滑动间隔 5 s : 每隔5s计算就最近10s的数据
- 窗口长度 10 s = 滑动间隔 10 s : 每隔10s计算就最近10s的数据
- 窗口长度 5 s > 滑动间隔 10 s : 每隔10s计算就最近5s的数据 – 会丢失数据
代码实现
object SparkWindow {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkWindow")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("data/ckp")
val data = ssc.socketTextStream("hadoop102", 9999)
val result = data
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(10), Seconds(5))
result.print()
ssc.start()
ssc.awaitTermination()
ssc.stop(true, true)
}
}
transform
需求
- 使用窗口函数计算热搜排行榜、
- 每隔10s计算最近20s的热搜排行榜
DStream没有排序的方法需要调用tranform方法对底层的RDD进行操作调用RDD的排序方法
代码实现
object SparkTopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkTopN")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("data/ckp")
val data = ssc.socketTextStream("hadoop102", 9999)
val count = data
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKeyAndWindow((v1 : Int, v2 : Int) => v1 + v2, Seconds(20), Seconds(10))
// 对RDD进行排序
val result = count.transform(rdd => {
val sortedRDD = rdd.sortBy(_._2, false)
val top = sortedRDD.take(3)
top.foreach(println)
sortedRDD
})
result.print()
ssc.start()
ssc.awaitTermination()
ssc.stop(true, true)
}
}
foreachRDD
需求
对以上操作产生的数据输出到其他组件上比如mysqlHDFS
foreachRDD将DStream中结果RDD进行输出
针对每批RDD数据操作无返回值
DStream.print底层调用的也是foreachRDD
object spark_foreachRDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("saprk_foreahcRDD")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("data/ckp")
// 接收数据
val value = ssc.socketTextStream("hadoop102", 9999)
//wc
val count = value
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKeyAndWindow((v1 : Int, v2 : Int) => v1 + v2, Seconds(20), Seconds(10))
val result = count.transform(rdd => {
val sortedRDD = rdd.sortBy(_._2, false)
val top = sortedRDD.take(3)
top.foreach(println)
sortedRDD
})
result.print()
result.foreachRDD((rdd, time) => {
// SimpleDateFormat来做Date到String的类型转换
// 解决JDK里自带的SimpleDateFormat存在线程不安全问题。
val df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val times : String = df.format(time.milliseconds)
if (!rdd.isEmpty()) {
// 输出到控制台
rdd.foreach(println)
// 输出到HDFS
// 将数据变成一个分区
rdd.coalesce(1).saveAsTextFile(s"hdfs://hadoop102:8020/wc/optput-${time.milliseconds}")
// 输出到mysql
rdd.foreachPartition(iter => {
val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306, root, root")
val sql = "replace into `t_hotwords` (`time`, `word`, `count`) value(time, word, count);"
val ps = conn.prepareStatement(sql) // 获取预编译对象
iter.foreach(t => {
val word = t._1
val count = t._2
ps.setTimestamp(1, new Timestamp(time.milliseconds))
ps.setString(2, word)
ps.setInt(3, count)
ps.addBatch()
})
ps.executeBatch()
ps.close()
conn.close()
})
}
})
ssc.start()
ssc.awaitTermination()
ssc.stop(true, true)
}
}