Spark SQL概述与基本操作-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

目录

一、Spark SQL概述

        1概念

        2特点

        3Spark SQL与Hive异同

        4Spark的数据抽象

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        2代码演示

三、DataFrame创建

        1DataFrame组成

        2DataFrame创建方式转换

        3DataFrame创建方式标准API读取

四、DataFrame编程

        1DSL语法风格

        2SQL语法风格

五、Spark SQL——wordcount代码示例

        1pyspark.sql.functions包

        2代码示例


一、Spark SQL概述

        1概念

        Spark SQL是Apache Spark的一个模块它用于处理结构化和半结构化的数据。Spark SQL允许用户使用SQL查询和操作数据这种操作可以直接在Spark的DataFrame/Dataset API中进行。此外Spark SQL还支持多种语言包括Scala、Java、Python和R。

        2特点

        ①融合性SQL可以无缝集成在代码中随时用SQL处理数据。

        ②统一数据访问一套标准API可读写不同的数据源。

        ③Hive兼容可以使用Spark SQL直接计算生成Hive数据表。

        ④标准化连接支持标准化JDBC \ ODBC连接方便和各种数据库进行数据交互。

        3Spark SQL与Hive异同

        共同点Hive和Spark均是:“分布式SQL计算引擎”均是构建大规模结构化数据计算的绝佳利器同时SparkSQL拥有更好的性能。

        4Spark的数据抽象

        Spark SQL的数据抽象

        Data Frame与RDD

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        在RDD阶段程序的执行入口对象是:SparkContext。在Spark 2.0后推出了SparkSession对象作为Spark编码的统一入口对象。

        Spark Session对象作用

        ①用于SparkSQL编程作为入口对象。

        ②用于SparkCore编程可以通过Spark Session对象中获取到Spark Context。

        2代码演示
# cording:utf8

# Spark Session对象的导包对象是来自于pyspark.sql包中
from pyspark.sql import SparkSession
if __name__ == '__main__':
    # 构建Spark Session执行环境入口对象
    spark = SparkSession.builder.\
            appName('test').\
            master('local[*]').\
            getOrCreate()
    # 通过Spark Session对象 获取SparkContext对象
    sc = spark.sparkContext

    # SparkSQL测试
    df = spark.read.csv('../input/stu_score.txt', sep=',', header=False)
    df2 = df.toDF('id', 'name', 'score')
    # 打印表结构
    # df2.printSchema()
    # 打印数据内容
    # df2.show()

    df2.createTempView('score')
    # SQL风格
    spark.sql("""SELECT * FROM score WHERE name='语文' LIMIT 5
    """).show()

    # DSL 风格
    df2.where("name='语文'").limit(5).show()

三、DataFrame创建

        1DataFrame组成

        DataFrame是一个二维表结构表格结构的组成

                ①行

                ②列

                ③表结构描述

        比如在MySQL中的一个表

                ①有许多列组成

                ②数据也被分为多个列

                ③表也有表结构信息列、列名、列类型、列约束等

        基于这个前提下DataFrame的组成如下

                在结构层面

                        ①StructType对象描述整个DataFrame的表结构

                        ②StructField对象描述一个列的信息

                在数据层面

                        ①Row对象记录一行数据

                        ②Column对象记录一列数据并包含列的信息

        2DataFrame创建方式转换

        ①基于RDD方式

# cording:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # 构建DataFrame对象
    # 参数1被转换的RDD
    # 参数2指定列名通过list的形式指定按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd,schema=['name', 'age'])

    # 打印Data Frame的表结构
    df.printSchema()

    # 打印df中的数据
    # 参数1表示 展示出多少条数据默认不传的话是20
    # 参数2表示是否对列进行截断如果列的数据长度超过20个字符串长度厚旬欸日不显示以....代替
    # 如果给False 表示不截断全部显示默认是True
    df.show(20,False)

    # 将DF对象转换成临时视图表可供sql语句查询
    df.createOrReplaceTempView('people')
    spark.sql('SELECT * FROM people WHERE age < 30').show()

        ②通过StructType对象来定义DataFrame的 ‘ 表结构 ’ 转换RDD

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # 构建表结构的描述对象StructType 对象
    # 参数1列名
    # 参数2列数据类型
    # 参数3是否允许为空
    schema = StructType().add('name', StringType(), nullable=True).\
        add('age', IntegerType(), nullable=False)

    # 构建DataFrame对象
    # 参数1被转换的RDD
    # 参数2指定列名通过list的形式指定按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd, schema=schema)

    df.printSchema()
    df.show()

        ③通过RDD的toDF方法创建RDD

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # toDF构建DataFrame
    # 第一种构建方式只能设置列名列类型靠RDD推断默认允许为空
    df1 = rdd.toDF(['name', 'name'])
    df1.printSchema()
    df1.show()
    # toDF方式2通过StructType来构造
    # 设置全面能设置列名、列数据类型、是否为空
    # 构建表结构的描述对象StructType 对象
    # 参数1列名
    # 参数2列数据类型
    # 参数3是否允许为空
    schema = StructType().add('name', StringType(), nullable=True).\
        add('age', IntegerType(), nullable=False)

    df2 = rdd.toDF(schema=schema)
    df2.printSchema()
    df2.show()




        ④基于Pandas的DataFrame创建DataFrame

# cording:utf8

from pyspark.sql import SparkSession
import pandas as pd

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 基于pandas的DataFrame构建SparkSQL的DataFrame对象
    pdf = pd.DataFrame(
        {
            'id': [1, 2, 3],
            'name': ['张大仙', '王晓晓', '吕不韦'],
            'age': [1, 2, 3]
        }
    )

    df = spark.createDataFrame(pdf)

    df.printSchema()
    df.show()

        3DataFrame创建方式标准API读取

        统一API示例代码

        ①读取本地text文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 构建StructTypetext数据源
    # text读取数据的特点是将一整行只作为一个列读取默认列名是value 类型是String
    schema = StructType().add('data', StringType(),nullable=True)
    df = spark.read.format('text').\
        schema(schema=schema).\
        load('../input/people.txt')

    df.printSchema()
    df.show()

        ②读取json文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # json文件类型自带Schema信息
    df = spark.read.format('json').load('../input/people.json')
    df.printSchema()
    df.show()

        ③读取csv文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 读取csv文件
    df = spark.read.format('csv').\
        option('sep', ';').\
        option('header', True).\
        option('encoding', 'utf-8').\
        schema('name STRING, age INT, job STRING').\
        load('../input/people.csv')

    df.printSchema()
    df.show()

        ④读取parquet文件

        parquet文件是Spark中常用的一种列式存储文件格式和Hive中的ORC差不多他们都是列存储格式。

        parquet对比普通的文本文件的区别

                ①parquet内置schema列名、列类型、是否为空

                ②存储是以列作为存储格式

                ③存储是序列化存储在文件中的有压缩属性体积小

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 读取parquet文件
    df = spark.read.format('parquet').load('../input/users.parquet')

    df.printSchema()
    df.show()

四、DataFrame编程

        1DSL语法风格
# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format('csv').\
        schema('id INT, subject STRING, score INT').\
        load('../input/stu_score.txt')

    # Column对象的获取
    id_column = df['id']
    subject_column = df['subject']

    # DLS风格
    df.select(['id', 'subject']).show()
    df.select('id', 'subject').show()
    df.select(id_column, subject_column).show()

    # filter API
    df.filter('score < 99').show()
    df.filter(df['score'] < 99).show()

    # where API
    df.where('score < 99').show()
    df.where(df['score'] < 99).show()

    # group By API
    # df.groupBy API的返回值为 GroupedData类型1
    # GroupedData对象不是DataFrame
    # 它是一个 有分组关系的数据结构有一些API供我们对分组做聚合
    # SQLgroup by 后接上聚合 sum avg count min max
    # GroupedData 类似于SQL分组后的数据结构同样由上述5中聚合方法
    # GroupedData 调用聚合方法后返回值依旧是DayaFrame
    # GroupedData 只是一个中转的对象最终还是会获得DataFrame的结果
    df.groupBy('subject').count().show()
    df.groupBy(df['subject']).count().show()
        2SQL语法风格

        DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表然后可以通过在程序中使用spark.sql()来执行SQL语句查询结果返回一个DataFrame。
        如果想使用SQL风格的语法需要将DataFrame注册成表采用如下的方式        

df.createTempView( "score")            #注册一个临时视图(表)
df.create0rReplaceTempView("score")    #注册一个临时表如果存在进行替换。
df.createGlobalTempView( "score")      #注册一个全局表

        全局表跨SparkSession对象使用在一个程序内的多个SparkSession中均可调用查询前带上前缀
        global_temp.
        临时表只在当前SparkSession中可用

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format('csv').\
        schema('id INT, subject STRING, score INT').\
        load('../input/stu_score.txt')

    # 注册成临时表
    df.createTempView('score')              # 注册临时视图表
    df.createOrReplaceTempView('score_2')   # 注册或者替换为临时视图
    df.createGlobalTempView('score_3')      # 注册全局临时视图 全局临时视图使用的时候 需要在前面带上global_temp. 前缀

    # 可以通过SparkSession对象的sql api来完成sql语句的执行
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM score GROUP BY subject").show()
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()

五、Spark SQL——wordcount代码示例

        1pyspark.sql.functions包

        这个包里面提供了一系列的计算函数供SparkSQL使用

        导包from pyspark.sql import functions as F

        这些函数返回值多数都是Column对象。

        2代码示例
# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder.appName('wordcount').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # TODO 1SQL风格进行处理
    rdd = sc.textFile('../input/words.txt').\
        flatMap(lambda x: x.split(' ')).\
        map(lambda x: [x])

    df = rdd.toDF(['word'])

    # 注册DF为表格
    df.createTempView('words')

    spark.sql('SELECT word,COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC').show()

    # TODO 2:DSL 风格处理
    df = spark.read.format('text').load('../input/words.txt')

    # withColumn 方法
    # 方法功能对已存在的列进行操作返回一个新的列如果名字和老列相同那么替换否则作为新列存在
    df2 = df.withColumn('value', F.explode(F.split(df['value'], ' ')))
    df2.groupBy('value').\
        count().\
        withColumnRenamed('value', 'word').\
        withColumnRenamed('count', 'cnt').\
        orderBy('cnt', ascending=False).show()

    # withColumnRenamed() 对列名进行重命名
    # orderBy() 排序

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6