Spark WordCount 案例

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

文章目录

Spark WordCount 案例

1、程序连接 Spark

首先这个Scala spark程序和spark的链接跟sql编程类似。首先new 一个新的val context = SparkContext()对象然后还要用到
val conf = SparkConf.setMaster("local").setAppName("WordCount")这个是配置信息比如这个是本地连接所以里面是local然后后面那个是程序的名字这个写完之后吧这个conf对象放在SparkContext(conf) 这里面。然后在程序的最后用完了要关闭连接context.stop()使用stop方法关闭

2、WordCount 案例示例

先在D盘把要测试的文件数据准备好
在这里插入图片描述

思路首先连接之后第一步是读取文件,使用 textFile()方法里面的参数是要读取的文件的路径然后把文件一行一行的读取出来。第二步是使用flatMap(_.split(" "))方法进行map映射和扁平化把单词按照空格分割开。第三步是groupBy(word => word)按照单词进行分组一样的单词分到一组。第四步map()映射进行模式匹配取去key和他的集合的size也就是单词出现的次数。然后使用collect()方法将结果采集打印最后使用foreach(println)进行遍历。

package com.atguigu.bigdata.spark.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

class spark01_WordCount {

}
object spark01_WordCount{
  def main(args: Array[String]): Unit = {
    // Application 我们自己写的应用程序
    // Spark 框架
    //用我们的应用程序去连接spark 就跟那个sql 编程一样
    //TODD建立和Spark 框架的连接
     //1、Java里面是Conntection 进行连接
     //2、Scala 里有个类似的SparkContext()
      //2.1 SparkConf()配置不然不晓得连的哪个. setMaster() 里面是本地连接,setAppName() 里面是app的名称
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val context = new SparkContext(sparkConf)
    println(context)

    //TODD 执行业务操作
    //1、读取文件,获取一行一行的数据 这一步是扁平化
    //hello word
    val value = context.textFile("D:\\wc.txt") //textFile 可以吧文件一行一行的读出来


    //2、将数据进行拆分形成一个一个的单词
    //扁平化将整体拆分为个体的操作
    //"hello word" => helloword
    val danci: RDD[String] = value.flatMap(a => a.split(" ")) //根据空格进行拆分


    //3、将数据根据单词进行分组便于统计
    //(hello,hello,hello,hello,hello),(word,word,word) 这个样子的
    //按照单词进行分组
    val wordGroup = danci.groupBy(word => word) //按照单词进行分组

    //4、对分组数的数据进行转换
    //(hello,hello,hello,hello,hello),(word,word,word)
    //(hello,5),(word,3)
    val wordToCount =  wordGroup.map{ //模式匹配
      case (word,list) => {
        (word,list.size) //匹配第一个是单词。第二个是长度这个长度就是单词出现的次数
      }
    }

    //5、将转换结果采集到控制台打印出来
    val tuples = wordToCount.collect() //collect()方法将结果采集打印
    tuples.foreach(println)



    //TODD 关闭连接
    context.stop() //这样就关闭连接了


  }
}

3、复杂版 WordCount

因为之前那个是用size方法得到次数但是这样就不像是一个聚合操作所以使用map映射然后使用reduce 进行聚合操作这样来得到单词出现的次数。

package com.atguigu.bigdata.spark.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//复杂版wordcount
class spark01_fuzaWrodCount {

}
object spark01_fuzaWrodCount{
  def main(args: Array[String]): Unit = {
    //之前是使用size 方法得出单词出现的次数但是那样实现不像是个聚合功能所以我们改善一下
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val context = new SparkContext(sparkConf)
    println(context)

    //TODD 执行业务操作
    //1、读取文件,获取一行一行的数据 这一步是扁平化
    //hello word
    val value = context.textFile("D:\\wc.txt") //textFile 可以吧文件一行一行的读出来


    //2、将数据进行拆分形成一个一个的单词
    //扁平化将整体拆分为个体的操作
    //"hello word" => helloword
    val danci: RDD[String] = value.flatMap(a => a.split(" ")) //根据空格进行拆分

    val wordToOne: RDD[(String, Int)] = danci.map(word => (word, 1)) //直接在这一步统计单词出现的次数
    val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(t => t._1) //然后按照方式取第一个元素为分组的依据
    val wordToCount = wordGroup.map{ //这一步不是用size了
      case (word,list) => {
        list.reduce(
          (t1,t2) => {
            (t1._1,t1._2 + t2._2)
          }
        )
      }
    }
    //这里不是直接size而是进行reduce聚合操作将key给加起来
    //val wordCount2 = wordGroup.map{case (word,list)=>{ list.reduce((t1,t2)=>{(t1._1,t1._2+t2._2)})}}
    val array: Array[(String, Int)] = wordToCount.collect() //采集结果打印输出
    array.foreach(println) //foreach()方法进行遍历
    

    //TODD 关闭连接
    context.stop() //这样就关闭连接了


  }
}

4、Spark 框架Wordcount

Spark框架里面有个方法分组和聚合可以一个方法完成reduceByKey(_ + _)这样大大减少了代码量从读取文件进来到输出结果四五行就能完成这个案例。

package com.atguigu.bigdata.spark.core.wc

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//使用saprk框架进行统计
class spark02_sparkCount {

}
object spark02_sparkCount{
  def main(args: Array[String]): Unit = {
    //之前是使用size 方法得出单词出现的次数但是那样实现不像是个聚合功能所以我们改善一下
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val context = new SparkContext(sparkConf)
    println(context)

    //TODD 执行业务操作
    //1、读取文件,获取一行一行的数据 这一步是扁平化
    //hello word
    val value = context.textFile("D:\\wc.txt") //textFile 可以吧文件一行一行的读出来


    //2、将数据进行拆分形成一个一个的单词
    //扁平化将整体拆分为个体的操作
    //"hello word" => helloword
    val danci: RDD[String] = value.flatMap(a => a.split(" ")) //根据空格进行拆分

    val wordToOne: RDD[(String, Int)] = danci.map(word => (word, 1)) //直接在这一步统计单词出现的次数

    //Spark 框架提供了更多的功能可以将分组和聚合使用一个功能实现
    //reduceByKey():相同的key的数据可以对value进行reduce聚合 这是spark提供的功能
    val wordCount = wordToOne.reduceByKey((x,y) => x+y) //相当于同一个key 进行累加_ + _ 可以简化成这样


    val array: Array[(String, Int)] = wordCount.collect() //采集结果打印输出
    array.foreach(println) //foreach()方法进行遍历


    //TODD 关闭连接
    context.stop() //这样就关闭连接了


  }
}

简化下来就是这几步
在这里插入图片描述

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6