SparkStreaming 操作

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

SparkStreaming 操作

一、WordCount

  • 安装nc,用来向端口发送数据
yum install -y nc
  • 启动客户端发送数据
nc -lk 9999
object WordCount {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 按时间间隔将流数据进行划分
    // Seconds : 秒
    val data = new StreamingContext(sc, Seconds(5))

    val inputDStream = data.socketTextStream("hadoop102", 9999)

    val resultDStream = inputDStream
      .filter(StringUtils.isNotBlank(_))
      .flatMap(_.trim.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)

     // 打印前十个元素
    resultDStream.print(10)

    // 启动并等待程序停止
    data.start()

    // 应用启动之后应该一直运行除非人为终止或程序异常
    data.awaitTermination()

    // 关闭流式应用
    data.stop(true, true)



  }

}

二、在一的基础上能够对历史数据进行累加

updateStateByKey

该操作可以保存历史状态同时利用新操作来更新数据

需要执行以下两个步骤

  1. 定义状态 - 状态可以是任意数据类型。
  2. 定义状态更新函数 - 使用函数指定如何使用 以前的状态和输入流中的新值。

在每个批处理中Spark 对所有的key进行更新无论他是否有变化

使用该操作需要是指checkpoint目录开启checkpoint方便异常恢复数据

// 历史状态需要有地方存储
// 设置checkpoint目录
ssc.checkpoint("data/ckp")

// 参数1Seq[Int]:当前批次的数据如果发送了两个sparkkey为sparkSeq[1,1]
// 参数2Option[Int]:上一次该key的历史值如果没有就为0有则取出来
// 返回值Option[Int]当前批次的值+历史值

val updateFunc = (currentValues: Seq[Int], historyValue: Option[Int]) => {

  // 当前批次的数据和历史数据进行合并得到结果
  if (currentValues.size > 0) {
    val newValue: Int = currentValues.sum + historyValue.getOrElse(0) // getOrElse为默认值
    Option(newValue)
  } else {
    historyValue
  }

}

val result = data
      .flatMap(_.split(" "))
      .map((_, 1))
      .updateStateByKey(updateFunc)

mapWithState

该操作和updateStateByKey的区别是只关心变化的key

 //第一个参数是key第二参数是当前value,第三个参数之前的value
val mappingFunc = (word: String, current: Option[Int], state: State[Int]) => {

  val newCount = current.getOrElse(0) + state.getOption().getOrElse(0)
  val output = (word, newCount)
  state.update(newCount)
  output

}

val result2 = data
  .flatMap(_.split(" "))
  .map((_, 1))
  .mapWithState(StateSpec.function(mappingFunc))

代码实现

object UpdateStateByKey {

  def main(args: Array[String]): Unit = {

    // 对从socket接收的数据做wordcount并能和历史数据及逆行累加

    // updateStateByKey会返回没有变化的数据数据量太大需要checkpoint
    // mapWithState只关注变化

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpdateStateByKey")
    val sc = new SparkContext(conf)

    // 数据流被划分为批的时间间隔
    val ssc = new StreamingContext(sc, Seconds(5))

    // 历史状态需要有地方存储
    // 设置checkpoint目录
    ssc.checkpoint("data/ckp")

    // 接收socket数据
    val data = ssc.socketTextStream("hadoop102", 9999)

    // wordcount
    // 参数1Seq[Int]:当前批次的数据如果发送了两个sparkkey为sparkSeq[1,1]
    // 参数2Option[Int]:上一次该key的历史值如果没有就为0有则取出来
    // 返回值Option[Int]当前批次的值+历史值

    val updateFunc = (currentValues: Seq[Int], historyValue: Option[Int]) => {

      // 当前批次的数据和历史数据进行合并得到结果
      if (currentValues.size > 0) {
        val newValue: Int = currentValues.sum + historyValue.getOrElse(0) // getOrElse为默认值
        Option(newValue)
      } else {
        historyValue
      }

    }

    val result = data
      .flatMap(_.split(" "))
      .map((_, 1))
      .updateStateByKey(updateFunc)

    //=================================mapWithState========================
    val mappingFunc = (word: String, current: Option[Int], state: State[Int]) => {

      val newCount = current.getOrElse(0) + state.getOption().getOrElse(0)
      val output = (word, newCount)
      state.update(newCount)
      output

    }

    val result2 = data
      .flatMap(_.split(" "))
      .map((_, 1))
      .mapWithState(StateSpec.function(mappingFunc))

    result.print()
    result2.print()

    ssc.start()

    ssc.awaitTermination()

    ssc.stop(true, true)

  }

}

在三的基础上实现程序停止之后再启动接着上次的结果进行累加

// 从检查点数据重新创建流上下文或创建新的流上下文。
// 如果提供的检查点路径中存在检查点数据
// 则 StreamingContext 将是 从检查点数据重新创建。
// 如果数据不存在则提供的 setupFunc 将用于创建新上下文。
val ssc : StreamingContext = StreamingContext.getOrCreate(dir, createStreamingContextFunction _)

完整代码

// 程序停止之后还能接着上次的结果进行累加

object StateRecovery {

  def createStreamingContextFunction : StreamingContext = {

    // 对从socket接收的数据做wordcount并能和历史数据及逆行累加

    // updateStateByKey会返回没有变化的数据数据量太大需要checkpoint
    // mapWithState只关注变化

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpdateStateByKey")
    val sc = new SparkContext(conf)

    // 数据流被划分为批的时间间隔
    val ssc = new StreamingContext(sc, Seconds(5))

    // 历史状态需要有地方存储
    // 设置checkpoint目录
    ssc.checkpoint("data/c")

    // 接收socket数据
    val data = ssc.socketTextStream("hadoop102", 9999)

    // wordcount
    // 参数1Seq[Int]:当前批次的数据如果发送了两个sparkkey为sparkSeq[1,1]
    // 参数2Option[Int]:上一次该key的历史值如果没有就为0有则取出来
    // 返回值Option[Int]当前批次的值+历史值

    val updateFunc = (currentValues: Seq[Int], historyValue: Option[Int]) => {

      // 当前批次的数据和历史数据进行合并得到结果
      if (currentValues.size > 0) {
        val newValue: Int = currentValues.sum + historyValue.getOrElse(0) // getOrElse为默认值
        Option(newValue)
      } else {
        historyValue
      }

    }

    val result = data
      .flatMap(_.split(" "))
      .map((_, 1))
      .updateStateByKey(updateFunc)

    result.print()

    ssc

  }

  def main(args: Array[String]): Unit = {

    val dir = "data/c"

    val ssc : StreamingContext = StreamingContext.getOrCreate(dir, createStreamingContextFunction _)
    val sc = ssc.sparkContext

    ssc.start()
    ssc.awaitTermination()
    ssc.stop(true, true)

  }

}

窗口函数

要求

每隔5s计算近10s的数据

  • 窗口长度 10 s > 滑动间隔 5 s : 每隔5s计算就最近10s的数据
  • 窗口长度 10 s = 滑动间隔 10 s : 每隔10s计算就最近10s的数据
  • 窗口长度 5 s > 滑动间隔 10 s : 每隔10s计算就最近5s的数据 – 会丢失数据

代码实现

object SparkWindow {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkWindow")
    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc, Seconds(5))

    ssc.checkpoint("data/ckp")

    val data = ssc.socketTextStream("hadoop102", 9999)

    val result = data
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(10), Seconds(5))

    result.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop(true, true)


  }
}

transform

需求

  • 使用窗口函数计算热搜排行榜、
  • 每隔10s计算最近20s的热搜排行榜

DStream没有排序的方法需要调用tranform方法对底层的RDD进行操作调用RDD的排序方法

代码实现

object SparkTopN {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkTopN")
    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc, Seconds(5))

    ssc.checkpoint("data/ckp")

    val data = ssc.socketTextStream("hadoop102", 9999)

    val count = data
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKeyAndWindow((v1 : Int, v2 : Int) => v1 + v2, Seconds(20), Seconds(10))
      
	// 对RDD进行排序
    val result = count.transform(rdd => {

      val sortedRDD = rdd.sortBy(_._2, false)
      val top = sortedRDD.take(3)
      top.foreach(println)
      sortedRDD

    })

    result.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop(true, true)

  }

}

foreachRDD

需求

对以上操作产生的数据输出到其他组件上比如mysqlHDFS

foreachRDD将DStream中结果RDD进行输出

针对每批RDD数据操作无返回值

DStream.print底层调用的也是foreachRDD

object spark_foreachRDD {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("saprk_foreahcRDD")
    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc, Seconds(5))

    ssc.checkpoint("data/ckp")

    // 接收数据
    val value = ssc.socketTextStream("hadoop102", 9999)

    //wc
    val count = value
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKeyAndWindow((v1 : Int, v2 : Int) => v1 + v2, Seconds(20), Seconds(10))


    val result = count.transform(rdd => {

      val sortedRDD = rdd.sortBy(_._2, false)
      val top = sortedRDD.take(3)

      top.foreach(println)
      sortedRDD

    })

    result.print()

    result.foreachRDD((rdd, time) => {

      // SimpleDateFormat来做Date到String的类型转换
      // 解决JDK里自带的SimpleDateFormat存在线程不安全问题。
      val df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
      val times : String = df.format(time.milliseconds)

      if (!rdd.isEmpty()) {

        // 输出到控制台
        rdd.foreach(println)
        // 输出到HDFS
        // 将数据变成一个分区
        rdd.coalesce(1).saveAsTextFile(s"hdfs://hadoop102:8020/wc/optput-${time.milliseconds}")
        // 输出到mysql
        rdd.foreachPartition(iter => {

          val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306, root, root")
          val sql = "replace into `t_hotwords` (`time`, `word`, `count`) value(time, word, count);"
          val ps = conn.prepareStatement(sql) // 获取预编译对象
          iter.foreach(t => {

            val word = t._1
            val count = t._2
            ps.setTimestamp(1, new Timestamp(time.milliseconds))
            ps.setString(2, word)
            ps.setInt(3, count)
            ps.addBatch()

          })

          ps.executeBatch()
          ps.close()
          conn.close()

        })

      }

    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop(true, true)

  }

}
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6