第三阶段第一章——PySpark实战-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

学习了这么多python的知识是时候来搞点真玩意儿了~~

春风得意马蹄疾一日看尽长安花

o(*︶*)o


 1.前言介绍

1什么是spark

        Apache Spark是一个开源的分布式计算框架用于处理大规模数据集的计算任务。它提供了一种高性能、通用、易用的计算引擎支持数据并行处理、内存计算、迭代计算等多种计算模式并提供了丰富的API比如Spark SQL、Spark Streaming、Mlib和Graphx等。Spark的基本单元是弹性分布式数据集RDD它是一种可分区、可并行计算的数据结构可以在多个节点上进行操作。Spark可以运行在多种集群管理器上包括Hadoop YARN、Apache Mesos和Standalone等。Spark的特点是速度快、易用、灵活、可扩展已经成为了数据处理和数据科学领域的一个重要工具。

2什么是pyspark

        PySpark是指Spark的Python API它是Spark的一部分可以通过Python语言进行Spark编程。PySpark提供了Python与Spark之间的交互使Python开发人员能够使用Python语言对Spark进行编程和操作。PySpark可以使用Python进行数据预处理和分析同时扩展了Python语言的功能能够同时使用Spark的分布式计算能力加快了数据处理和分析的速度。与其他编程语言相比Python在数据处理和科学计算方面有广泛的应用和支持因此PySpark特别适合处理Python与大规模数据集交互的数据科学项目。

 


2.基础准备

1先下载  pyspark软件包

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ pyspark

2SparkContext入口对象 

初体验代码

"""
    演示
"""
# 导包
from pyspark import SparkConf, SparkContext

# 创建sparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行停止PySpark程序
sc.stop()

3PySpark编程模型

详细过程 ==>>


3.数据输入

1RDD对象

数据输入完成之后都会得到一个RDD对象

2数据容器转RDD对象

"""
    演示数据输入
"""
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 通过parallelize方法将Python对象加载到Spark内成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
# 查看rdd里面的内容需要使用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

sc.stop()

3读取文件转化为RDD对象

代码示例


# 用过textFile方法读取文件数据加载到Spark内成为RDD对象
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd_file = sc.textFile("D:\\IOText\\b.txt")
print(rdd_file.collect())

sc.stop()


4.数据计算

pyspark是通过RDD对象内丰富的成员方法算子

1map方法

功能将RDD的数据一条条处理处理的逻辑 基于map算子中接收的处理函数返回新的RDD

语法

代码示例

"""
    演示map方法
"""
from pyspark import SparkConf, SparkContext
import os
#配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("map_test")
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])


# 通过map方法将全部数据都乘以10
# (T) -> U
def func(data):
    return data * 10


rdd2 = rdd.map(func)
print(rdd2.collect())

# 链式调用,快速解决
rdd3 = rdd.map(lambda e: e * 100).map(lambda e: e + 6)
print(rdd3.collect())

2flatMap方法

和map差不多多了一个消除嵌套的效果

功能:对rdd执行map操作然后进行解除嵌套

代码示例

"""
    演示 flatmap 方法
"""
from pyspark import SparkConf, SparkContext
import os

# 配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("flatmap_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize(["a b c", "e f g", "uuu ggg"])
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())

3reduceByKey方法

功能自动分组组内聚合

代码示例

"""
    演示 reduceByKey 方法
"""
from pyspark import SparkConf, SparkContext
import os

# 配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("reduceByKey_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和呃女生两个组的成绩之和
rdd1 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd1.collect())


4练习案例1

目标统计文件内出现单词的数量.

代码示例

"""
    读取文件统计文件内单词出现的数量
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_1")
sc = SparkContext(conf=conf)

# 读取文件
file_rdd = sc.textFile("D:\\IOText\\b.txt")

# 取出所有的单词
words = file_rdd.flatMap(lambda line: line.split(' '))
print(words.collect())
# 将其转化为map类型value设置为1,方便在之后分组统计
word_one = words.map(lambda w: (w, 1))
# 分组求和
res = word_one.reduceByKey(lambda a, b: a + b)

print(res.collect())

我的文件数据是

word1
word2 word2
word3 word3 word3
word4 word4 word4 word4

5filter方法

功能过滤想要的数据进行保留

filter返回值为TRUE则保留不然会被过滤

语法

代码示例

"""
    filter方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# filter返回值为TRUE则保留不然会被过滤
rdd2 = rdd.filter(lambda e: e % 2 == 0)
print(rdd2.collect())

6distinct方法

功能对RDD数据进行去重返回新的RDD

语法

rdd.distinct( )   # 这里无需传参

代码示例

"""
    distinct方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 2, 3, 4, 5])
# filter返回值为TRUE则保留不然会被过滤
rdd2 = rdd.distinct()
print(rdd2.collect())

7sortBy方法

功能对RDD数据进行排序基于你指定的排序依据

语法

"""
    sortBy方法演示
"""
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 读取文件
file_rdd = sc.textFile("D:\\IOText\\b.txt")

# 取出所有的单词
words = file_rdd.flatMap(lambda line: line.split(' '))
# print(words.collect())
# 将其转化为map类型value设置为1,方便在之后分组统计
word_one = words.map(lambda w: (w, 1))
# 分组求和
res = word_one.reduceByKey(lambda a, b: a + b)
# print(res.collect())
# 按照出现次数从小到大排序
final_rdd = res.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
print(final_rdd.collect())


8练习案例2

需要的数据已经上传到博客

三个需求

# TODO 需求一城市销售额排名
# TODO 需求2全部城市有哪些商品类别在售卖
# TODO 需求3北京市有哪些商品类别在售卖
"""
    sortBy方法演示
"""
from pyspark import SparkConf, SparkContext
import json
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# TODO 需求一城市销售额排名
# 1.1 读取文件
file_rdd = sc.textFile("D:\\IOText\\orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 1.3 取出城市和销售额数据
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# print(dict_rdd.collect())
# 1.4 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按销售额聚合结果进行排序
result_rdd_1 = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果", result_rdd_1.collect())

# TODO 需求2全部城市有哪些商品类别在售卖
# 同时记得去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果", category_rdd.collect())

# TODO 需求3北京市有哪些商品类别在售卖
# 3.1 只要北京市的数据
rdd_beijing = dict_rdd.filter(lambda x: x["areaName"] == "北京")
# 3.2 取出全部商品类别并且去重
result_rdd_3 = rdd_beijing.map(lambda x: x['category']).distinct()
print("需求3的结果", result_rdd_3.collect())


5.数据输出

1输出为Python对象collect算子

功能将RDD各个分区内的数据统一手机到Driver中形成一个List对象

用法

rdd.collect( )   # 返回值是一个list

2输出为Python对象reduce算子

功能对RDD数据集按照你传入的逻辑进行聚合

语法

3输出为Python对象take算子

功能取RDD的前N个元素组合成list返回给你

语法

4输出为Python对象count算子

功能计算RDD有多少条数据返回值是一个数字

语法

5上述1-4算子代码示例

"""
    collect算子
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# collect算子输出RDD为list对象
rdd_list: list = rdd.collect()
print(type(rdd_list))
print(rdd_list)

# reduce算子对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)

# take算子取出RDD前N个元素组成list返回
take_list_3 = rdd.take(3)
print(take_list_3)

# count算子统计rdd内有多少条数据返回值为数字
num_count = rdd.count()
print(f"有{num_count}个元素")

 

6输出到文件中saveAsTextFile算子

功能将RDD的数据写入文本文件中

支持  本地写出hdfs等文件系统

语法

注意事项

 代码示例

"""
    saveAsTextFile
"""

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\CodeSoft\\Hadoop\\hadoop-3.0.0"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)

# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# 准备RDD2
rdd2 = sc.parallelize([("hello", 3), ("spark", 5), ("java", 7)])
# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])

# 输出到文件之中
rdd1.saveAsTextFile("D:\\IOText\\saveAsTextFile测试1")
rdd2.saveAsTextFile("D:\\IOText\\saveAsTextFile测试2")
rdd3.saveAsTextFile("D:\\IOText\\saveAsTextFile测试3")

上述方法会使文件内有对应你CPU核心数量的分区

修改RDD分区为1个

方式一SparkConf对象设置属性为全局并行度为1

 

方式二创建RDD的时候设置parallelize方法传入numSlices的参数为1


6.综合案例

搜索引擎日志分析

实现代码
 

"""
    综合案例
"""

from pyspark import SparkConf, SparkContext
import os
import json

os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\CodeSoft\\Hadoop\\hadoop-3.0.0"

conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
conf.set("spark.default.parallelism", "1")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)

# 读取文件转换成RDD
file_rdd = sc.textFile("D:\\IOText\\search_log.txt")

# TODO 需求一热门搜索时间段Top3小时精度
# 1.1 取出全部的时间并且转换为小时
# 1.2 转换为小时1的二元组
# 1.3 key分组聚合value
# 1.4 排序降序
# 1.5 取前三
res1 = (file_rdd
        .map(lambda x: (x.split("\t")[0][:2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(3)
        )
print(f"需求一的结果{res1}")

# TODO 需求二热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 词1二元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 top3
res2 = (file_rdd
        .map(lambda x: (x.split("\t")[2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(3)
        )
print(f"需求二的结果{res2}")

# TODO 需求三统计黑马程序员关键字在什么时间段被搜索的最多
# 3.1 过滤内容只保留黑马程序员关键词
# 3.2 转换为小时1的二元组
# 3.3 分组聚合
# 3.4 排序
# 3.5 取前一
res3 = (file_rdd
        .map(lambda x: x.split("\t"))
        .filter(lambda x: x[2] == "黑马程序员")
        .map(lambda x: (x[0][:2], 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[1], ascending=False, numPartitions=1)
        .take(1)
        )
print(f"需求三的结果{res3}")

# TODO 需求四将数据转化为JSON格式写出到文件之中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
(file_rdd
 .map(lambda x: x.split("\t"))
 .map(lambda x: {
    "time": x[0],
    "user_id": x[1],
    "key_word": x[2],
    "rank1": x[3],
    "rank2": x[4],
    "url": x[5]}
      )
 .saveAsTextFile("D:\\IOText\\综合案例")
 )

结语

有啥好说的。呃呃呃呃呃呃呃没啥好说的

下班

┏(0)┛

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6