8.spark自适应查询-AQE之自适应调整Shuffle分区数量-CSDN博客
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
目录
概述
自适应查询执行AQE是 Spark SQL
中的一种优化技术它利用运行时统计信息来选择最高效的查询执行计划自Apache Spark 3.2.0
以来默认启用该计划。从Spark 3.0
开始AQE有三个主要功如下
- 自适应查询AQE(Adaptive Query Execution)
- 自适应调整Shuffle分区数量
- 原理
- 默认环境配置
- 修改配置
- 动态调整Join策略
- 动态优化倾斜的 Join
- 自适应调整Shuffle分区数量
主要功能
自适应调整Shuffle分区数量
当spark.sql.adaptive.enabled
和spark.sql.adaptive.coalescePartitions.enabled
配置均为true
时自适应调整Shuffle
分区数量功能就启动了
属性名称 | 默认值 | 功能 | 版本 |
---|---|---|---|
spark.sql.adaptive.enabled | true | 必备条件之一 | 3.0.0 |
spark.sql.adaptive.coalescePartitions.enabled | true | 必备条件之二 | 3.0.0 |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64 MB | 自适应优化期间shuffle 分区的建议大小以字节为单位。当Spark合并小 的shuffle分区或拆分倾斜 的shuffler分区时它就会生效。 | 3.0.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst | true | 当为true时Spark在合并连续的shuffle分区时会忽略Spark.sql.adaptive.advisoryPartitionSizeInBytes 默认64MB指定的目标大小并且只遵循Spark.sql.adaptive.salecePartitions.minPartitionSize 默认1MB指定的最小分区大小以最大限度地提高并行性 。这是为了在启用自适应查询执行时避免性能回归 。建议将此配置设置为false 并遵守spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目标大小。 | 3.2.0 |
原理
Spark在处理海量数据的时候其中的Shuffle过程是比较消耗资源的也比较影响性能因为它需要在网络中传输数据。
shuffle 中的一个关键属性是分区的数量。
分区的最佳数量取决于数据自身大小但是数据大小可能在不同的阶段、不同的查询之间有很大的差异这使得这个数字很难精准调优。
如果分区数量太多每个分区的数据就很小读取小的数据块会导致IO效率降低并且也会产生过多的task, 这样会给Spark任务带来更多负担。
如果分区数量太少那么每个分区处理的数据可能非常大处理这些大分区的数据可能需要将数据溢写到磁盘例如排序或聚合操作这样也会降低计算效率。
Spark初始会设置一个较大的Shuffle分区个数这个数值默认是200
后续在运行时会根据动态统计到的数据信息将小的分区合并也就是慢慢减少分区数量。
测试时将以SELECT workorder,unitid,partid,partname,routeid,lineid from ods.xx where dt ='2023-06-24' group by workorder,unitid,partid ,partname ,routeid,lineid
语句进行测试为了看出 Shuffle 的效果group
字段多了一些
将初始的 Shuffle
分区数量设置为 5
所以在 Shuffle 过程中数据会产生5 个分区。如果没有开启自适应调整Shuffle分区数量这个策略Spark会启动5个Recuce任务来完成最后的聚合。但是这里面有3个非常小的分区为每个分区分别启动一个单独的任务会浪费资源并且也无法提高执行效率。如下图
开启自适应调整 Shuffle
分区数量之后Spark 会将这3个数据量比较小的分区合并为 1
个分区让1个reduce任务处理
默认环境配置
测试案例:
案例环境使用的是
spark 3.2.4
kyuubi 1.7.1
版本使用一张20
亿的表做优化测试的也可以准备一个json
文件加载后转成DataFrame
SELECT workorder,unitid,partid,partname,routeid,lineid from ods.xx where dt ='2023-06-24' group by workorder,unitid,partid ,partname ,routeid,lineid
由上两个图可以看出21
任务每个任务只是 3~4
M 这样原因是因
spark.sql.adaptive.coalescePartitions.parallelismFirst = true
修改配置
spark.sql.adaptive.coalescePartitions.parallelismFirst=false
可以看出两三千万的数据shuffle
处理上还是有倾斜的但海量数据下基本上是接近64m
的。
结束
至此自适应调整Shuffle
分区数量就结束了。
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |