实训笔记——Spark SQL编程-CSDN博客

Spark SQL编程

Spark SQL属于Spark计算框架的一部分是专门负责结构化数据的处理计算框架Spark SQL提供了两种数据抽象DataFrame、Dataset,都是基于RDD之上的一种高级数据抽象在RDD基础之上增加了一个schema表结构。

一、准备Spark SQL的编程环境

1.1 创建Spark SQL的编程项目scala语言支持的

1.2 引入编程依赖

spark_core_2.12

hadoop_hdfs

spark_sql_2.12

hadoop的有一个依赖jackson版本和scala2.12版本冲突了Spark依赖中也有这个依赖但是默认使用的是pom.xml先引入的那个依赖把hadoop中jackson依赖排除了即可。

<dependency>      
    <groupId>org.apache.hadoop</groupId>      
    <artifactId>hadoop-hdfs</artifactId>      
    <version>3.1.4</version>      
    <exclusions>        
    	<exclusion>          
        <groupId>com.fasterxml.jackson.module</groupId>          
        <artifactId>*</artifactId>        
        </exclusion>        
        <exclusion>          
            <groupId>com.fasterxml.jackson.core</groupId>        
            <artifactId>*</artifactId>        
        </exclusion>      
    </exclusions>    
</dependency>

二、Spark SQL程序编程的入口

2.1 SQLContext

SQLContext只能做SQL编程无法操作Hive以及使用HQL操作

2.2 HiveContext

HiveContext专门提供用来操作和Hive相关的编程

2.3 SparkSession

SparkSession全新的Spark SQL程序执行入口把SQLContext和HiveContext功能全部整合了SparkSession底层封装了一个SparkContext而且SparkSession可以开启Hive的支持

三、DataFrame的创建

DataFrame是以前旧版本的数据抽象untyped类型的数据抽象Dataset是新版本的数据抽象typed有类型的数据抽象新版本当中DataFrame底层就是Dataset[Row]

3.1 使用隐式转换函数

使用隐式转换函数从RDD、Scala集合创建DataFrame toDF() toDF(columnName*)

机制

如果集合或者RDD的类型不是Bean而且再toDF没有传入任何的列名那么Spark会默认按照列的个数给生成随机的列名但是如果类型是一个Bean类型那么toDF产生的随机列名就是bean的属性名

3.2 通过SparkSession

通过SparkSession自带的createDataFrame函数从集合或者RDD中创建DataFrame

3.3 从Spark SQL

Spark SQL支持的数据源创建DataFrameHDFS、Hive、JSON文件、CSV文件等等)

3.3.1 HDFS、本地文件系统创建
  1. 普通的文本文档
  2. CSV文件
  3. JSON文件
  4. ORC文件
  5. Parquet文件

ss.read.option(xxx,xxx).csv/json/text/orc/parquet(path)

3.3.2 JDBC支持的数据库数据源创建

ss.read.jdbc(url,table,properties)

3.3.3 Spark SQL On Hive创建

使用Hive做数据存储使用Spark SQL读取Hive的数据进行处理

有个提前的准备

  1. 开启SparkSession的Hive支持
  2. 引入spark-hive的编程依赖
  3. 还需要将Hive的配置文件hive-site.xml放到指定的位置

ss.sql(“HQL语句”)

3.3.1 外部存储HDFS中读取数据成为DataFrame

ss.read.format("jsonxx").load("path") 不太好用

ss.read.option(key,value).option(....).csv/json(path)

3.3.2 从jdbc支持的数据库创建DataFrame

ss.read.jdbc(url,table,properties)

3.3.3 读取Hive数据成为DataFrame
  1. 通过SparkSession开启Hive的支持
  2. 引入spark-hive的编程依赖
  3. 通过ss.sql()

3.4 从其他的DataFrame转换的来

四、DataFrame的编程风格

通过代码来操作计算DataFrame中数据

4.1 SQL编程风格

Dataset提供的一系列转换算子来进行操作

4.1.1 将创建的DataFrame或者Dataset转换成为一张临时表格
4.1.2 然后通过ss.sql(sql语句)进行数据的查询

4.2 DSL编程风格

DataFrame和Dataset提供了一系列的API操作API说白了就是Spark SQL中算子操作可以通过算子操作来获取DataFrame或者Dataset中的数据

4.2.1 转换算子

RDD具备的算子DataFrame基本上都可以使用

DataFrame还增加了一些和SQL操作有关的算子 selectExpr、where/filter、groupBy、orderBy/sort、limit、join

4.2.2 行动算子

RDD具备的行动算子DataFrame和Dataset也都具备一些

函数名说明
collect/collectAsList不建议使用尤其是数据量特别庞大的情况下
foreach/foreachPartition获取结果集的一部分数据
first/take(n)/head(n)/takeAsList(n)/tail(n)获取的返回值类型就是Dataset存储的数据类型
printSchema获取DataFrame或者Dataset的表结构的
show()
show(num,truncate:boolean)
show(num,truncate:Int)
show(num,truncate:Int,ver:boolean)

保存输出的算子

  1. 文件系统

    1. df/ds.write.mode(SaveMode).csv/json/parquet/orc/text(path--目录)
    2. text纯文本文档要求DataFrame和Dataset的结果集只有一列 而且列必须是String类型
  2. JDBC支持的数据库

  3. Hive

五、DataSet的创建和使用

Dataset有类型DataFrame无类型的。

5.1 创建

5.1.1 隐式转换toDS()
5.1.2 通过SparkSession的createDataset函数创建
5.1.3 通过DataFrame转换得到Dataset df.as[类型-Bean对象必须有getter、setter方法] 也是需要隐式转换的

六、Spark SQL的函数操作

Spark SQL基本上常见的MySQL、Hive中函数都是支持的

6.1 Spark SQL特点

6.1.1 易整合
6.1.2 统一的数据访问方式
6.1.3 兼容Hive
6.1.4 标准的数据库连接

6.2 自定义函数

ss.udf.register(name,函数)

  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6