Spark SQL增量查询Hudi表

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

前些天发现了一个巨牛的人工智能学习网站通俗易懂风趣幽默忍不住分享一下给大家。点击跳转到网站https://www.captainai.net/dongkelun

前言

由于项目上主要用Hive查询Hudi所以之前总结过一篇:Hive增量查询Hudi表。最近可能会有Spark SQL增量查询Hudi表的需求并且我发现目前用纯Spark SQL的形式还不能直接增量查询Hudi表于是进行学习总结一下。

编程方式(DF+SQL

先看一下官方文档上Spark SQL增量查询的方式地址https://hudi.apache.org/cn/docs/quick-start-guide#incremental-queryhttps://hudi.apache.org/cn/docs/querying_data#incremental-query

它是先通过spark.read中添加增量参数的形式读Hudi表为DF然后将DF注册成临时表最后通过Spark SQL查询临时表的形式实现增量查询的。

参数

  • hoodie.datasource.query.type=incremental 查询类型值为incremental时代表增量查询默认值snapshot增量查询时该参数必填
  • hoodie.datasource.read.begin.instanttime 增量查询开始时间必填 例如20221126170009762
  • hoodie.datasource.read.end.instanttime 增量查询结束时间非必填 例如20221126170023240
  • hoodie.datasource.read.incr.path.glob 增量查询指定分区路径非必填 例如 /dt=2022-11*/*
    查询范围 (BEGIN_INSTANTTIME,END_INSTANTTIME]也就是大于开始时间(不包含小于等于结束时间(包含如果没有指定结束时间那么查询大于BEGIN_INSTANTTIME到现在为止最新的数据如果指定INCR_PATH_GLOB那么只在指定分区路径下面查询对应的数据。

代码示例

完整代码地址https://github.com/dongkelun/hudi-demo/blob/master/hudi0.12_spark3.1/src/main/scala/com/dkl/hudi/spark3_1/IncrementalQuery.scala

import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, END_INSTANTTIME, INCR_PATH_GLOB, QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier

val tableName = "test_hudi_incremental"

spark.sql(
  s"""
     |create table $tableName (
     |  id int,
     |  name string,
     |  price double,
     |  ts long,
     |  dt string
     |) using hudi
     | partitioned by (dt)
     | options (
     |  primaryKey = 'id',
     |  preCombineField = 'ts',
     |  type = 'cow'
     | )
     |""".stripMargin)

spark.sql(s"insert into $tableName values (1,'hudi',10,100,'2022-11-25')")
spark.sql(s"insert into $tableName values (2,'hudi',10,100,'2022-11-25')")
spark.sql(s"insert into $tableName values (3,'hudi',10,100,'2022-11-26')")
spark.sql(s"insert into $tableName values (4,'hudi',10,100,'2022-12-26')")
spark.sql(s"insert into $tableName values (5,'hudi',10,100,'2022-12-27')")

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
val basePath = table.storage.properties("path")

// incrementally query data
val incrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME.key, beginTime).
  option(END_INSTANTTIME.key, endTime).
  option(INCR_PATH_GLOB.key, "/dt=2022-11*/*").
        load(basePath)
//  table(tableName)

incrementalDF.createOrReplaceTempView(s"temp_$tableName")

spark.sql(s"select * from  temp_$tableName").show()
spark.stop()

结果

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|price| ts|        dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|  20221126165954300|20221126165954300...|              id:1|         dt=2022-11-25|de99b299-b9de-423...|  1|hudi| 10.0|100|2022-11-25|
|  20221126170009762|20221126170009762...|              id:2|         dt=2022-11-25|de99b299-b9de-423...|  2|hudi| 10.0|100|2022-11-25|
|  20221126170030470|20221126170030470...|              id:5|         dt=2022-12-27|75f8a760-9dc3-452...|  5|hudi| 10.0|100|2022-12-27|
|  20221126170023240|20221126170023240...|              id:4|         dt=2022-12-26|4751225d-4848-4dd...|  4|hudi| 10.0|100|2022-12-26|
|  20221126170017119|20221126170017119...|              id:3|         dt=2022-11-26|2272e513-5516-43f...|  3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+

+-----------------+
|      commit_time|
+-----------------+
|20221126170030470|
|20221126170023240|
|20221126170017119|
|20221126170009762|
|20221126165954300|
+-----------------+

20221126170009762
20221126170023240
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|price| ts|        dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|  20221126170017119|20221126170017119...|              id:3|         dt=2022-11-26|2272e513-5516-43f...|  3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+

注释掉INCR_PATH_GLOB结果

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|price| ts|        dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|  20221127155346067|20221127155346067...|              id:4|         dt=2022-12-26|33e7a2ed-ea28-428...|  4|hudi| 10.0|100|2022-12-26|
|  20221127155339981|20221127155339981...|              id:3|         dt=2022-11-26|a5652ae0-942a-425...|  3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+

继续注释掉END_INSTANTTIME结果

20221127161253433
20221127161311831
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|price| ts|        dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|  20221127161320347|20221127161320347...|              id:5|         dt=2022-12-27|7b389e57-ca44-4aa...|  5|hudi| 10.0|100|2022-12-27|
|  20221127161311831|20221127161311831...|              id:4|         dt=2022-12-26|2707ce02-548a-422...|  4|hudi| 10.0|100|2022-12-26|
|  20221127161304742|20221127161304742...|              id:3|         dt=2022-11-26|264bc4a9-930d-4ec...|  3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+

可以看到不包含起始时间包含结束时间。

纯SQL方式

一般项目上都采用纯SQL方式进行增量查询这样比较方便纯SQL的方式参数和上面讲的参数是一样的接下来看一下怎么用纯SQL方式实现

建表造数

create table hudi.test_hudi_incremental (
  id int,
  name string,
  price double,
  ts long,
  dt string
) using hudi
 partitioned by (dt)
 options (
  primaryKey = 'id',
  preCombineField = 'ts',
  type = 'cow'
);

insert into hudi.test_hudi_incremental values (1,'a1', 10, 1000, '2022-11-25');
insert into hudi.test_hudi_incremental values (2,'a2', 20, 2000, '2022-11-25');
insert into hudi.test_hudi_incremental values (3,'a3', 30, 3000, '2022-11-26');
insert into hudi.test_hudi_incremental values (4,'a4', 40, 4000, '2022-12-26');
insert into hudi.test_hudi_incremental values (5,'a5', 50, 5000, '2022-12-27');

看一下有哪些commit_time

select distinct(_hoodie_commit_time) from test_hudi_incremental order by _hoodie_commit_time
+----------------------+
| _hoodie_commit_time  |
+----------------------+
| 20221130163618650    |
| 20221130163703640    |
| 20221130163720795    |
| 20221130163726780    |
| 20221130163823274    |
+----------------------+

纯SQL方式(一

使用Call Procedurescopy_to_temp_viewcopy_to_table目前这两个命令已经合到master,由scxwhite 苏乘祥贡献这俩参数差不多建议使用copy_to_temp_view因为copy_to_table会先将数据落盘而copy_to_temp_view是创建的临时表效率会高一点且数据落盘无意义后面还要将落盘的表删掉。

支持的参数

  • table
  • query_type
  • view_name
  • begin_instance_time
  • end_instance_time
  • as_of_instant
  • replace
  • global

测试SQL

call copy_to_temp_view(table => 'test_hudi_incremental', query_type => 'incremental', 
view_name => 'temp_incremental', begin_instance_time=> '20221130163703640', end_instance_time => '20221130163726780');

select _hoodie_commit_time, id, name, price, ts, dt from temp_incremental;

结果

+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time  | id  | name  | price  |  ts   |     dt      |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163726780    | 4   | a4    | 40.0   | 4000  | 2022-12-26  |
| 20221130163720795    | 3   | a3    | 30.0   | 3000  | 2022-11-26  |
+----------------------+-----+-------+--------+-------+-------------+

可以看到这种方式是可以实现增量查询的但是需要注意如果需要修改增量查询的起始时间,那么就需要重复执行copy_to_temp_view,但是因为临时表temp_incremental已经存在要么新起个表名要么先删掉再创建新的我建议先删掉通过下面的命令删除

drop view if exists temp_incremental;

纯SQL方式(二

PR地址https://github.com/apache/hudi/pull/7182这个PR同样由scxwhite贡献目前只支持Spark3.2以上的版本(目前社区未合并
增量查询SQL

select id, name, price, ts, dt from tableName
[
'hoodie.datasource.query.type'=>'incremental',
'hoodie.datasource.read.begin.instanttime'=>'$instant1',
'hoodie.datasource.read.end.instanttime'=>'$instant2'
]

这种方式是支持了一种新的语法在查询SQL后通过在[]添加参数的形式感兴趣的话可以拉一下代码自己打包试一下。

纯SQL方式(三

使用 Spark SQL Hint实现具体实现方式请查看KnightChess的这篇文章如何使用 Spark SQL Hint 对 Hudi 进行增量查询、时间旅行
最终的效果如下

select
  /*+
    hoodie_prop(
      'default.h1',
      map('hoodie.datasource.read.begin.instanttime', '20221127083503537', 'hoodie.datasource.read.end.instanttime', '20221127083506081')
    ),
    hoodie_prop(
      'default.h2',
      map('hoodie.datasource.read.begin.instanttime', '20221127083508715', 'hoodie.datasource.read.end.instanttime', '20221127083511803')
    )
  */
  id, name, price, ts
from (
  select id, name, price, ts
  from default.h1
  union all
  select id, name, price, ts
  from default.h2
)

是在hint中添加增量查询相关的参数先指定表名再写参数但是文章好像未给出完整的代码地址大家有时间可以自己试一下。

纯SQL方式(四

这种方式是我按照Hive增量查询Hudi的方式修改的源码通过set的方式实现增量查询。
PR地址https://github.com/apache/hudi/pull/7339

关于为啥目前不能通过set参数进行增量查询这里说明一下根据文章Hudi Spark SQL源码学习总结-select(查询可知Hudi的DefaultSource.createRelation中的optParams参数为readDataSourceTable中的options = table.storage.properties ++ pathOption也就是表本身属性中的配置参数+path之后在createRelation并没有接收其他参数所以不能通过set参数的形式进行查询

和Hive增量查询一样指定具体表名的增量查询参数

set hoodie.test_hudi_incremental.datasource.query.type=incremental
set hoodie.test_hudi_incremental.datasource.read.begin.instanttime=20221130163703640;
select _hoodie_commit_time, id, name, price, ts, dt from test_hudi_incremental;
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time  | id  | name  | price  |  ts   |     dt      |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163823274    | 5   | a5    | 50.0   | 5000  | 2022-12-27  |
| 20221130163726780    | 4   | a4    | 40.0   | 4000  | 2022-12-26  |
| 20221130163720795    | 3   | a3    | 30.0   | 3000  | 2022-11-26  |
+----------------------+-----+-------+--------+-------+-------------+

如果不同的库下面有相同的表名则可以通过库名.表名的形式

## 需要先开启使用数据库名称限定表名的配置,开启后上面不加库名的配置就失效了
set hoodie.query.use.database = true;
set hoodie.hudi.test_hudi_incremental.datasource.query.type=incremental;
set hoodie.hudi.test_hudi_incremental.datasource.read.begin.instanttime=20221130163703640;
set hoodie.hudi.test_hudi_incremental.datasource.read.end.instanttime=20221130163726780;
set hoodie.hudi.test_hudi_incremental.datasource.read.incr.path.glob=/dt=2022-11*/*;
refresh table test_hudi_incremental;
select _hoodie_commit_time, id, name, price, ts, dt from test_hudi_incremental;
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time  | id  | name  | price  |  ts   |     dt      |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163720795    | 3   | a3    | 30.0   | 3000  | 2022-11-26  |
+----------------------+-----+-------+--------+-------+-------------+

大家可以自己试一下不同的库表关联的情形

这里需要注意一点更新参数后需要先refresh table再查询否则查询时修改的参数不生效因为会使用缓存中的参数。
这种方式只是简单地修改了一下源码使set的参数对查询生效。

为了避免有些读者嫌打包麻烦这里给大家提供了hudi-spark3.1-bundle_2.12-0.13.0-SNAPSHOT.jar的下载地址https://download.csdn.net/download/dkl12/87221476

总结

本文总结了Spark SQL增量查询Hudi表的一些参数设置并给出了示例介绍了使用纯Spark SQL实现增量查询Hudi表的几种方式不确定未来社区会采用哪种方式大家目前如果有这种需求的话可以先选择一种自己喜欢的方式等未来社区版本支持后再升级版本。本文没有涉及增量查询的原理暂未验证增量查询的效率是否可以起到文件过滤的效果以后如果有时间会单独整理一篇。

相关阅读

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6