Hudi Spark Sql Procedures 回滚 Hudi 表数据

Rollbac

官方文档:https://hudi.apache.org/docs/rollbacks

回滚数据,可以回滚到至上一个commit(成功的),也可以回滚到指定 commit (savepoint)。本文暂不涉及savepoint,只总结 回滚到上一个 commit。

  • 自动回滚:默认写任务时会自动触发回滚:rollbackFailedWrites
    每次写数据前会先检测上一个commit是否完成,如果上一个commit失败,删除上一个commit,如果成功,则不作任务操作。
    如何检测commit失败:一个完整的commit 有 .commit.requested-> .inflight->.commit ,如果只有  .commit.requested 或 .inflight 则认为不完整,commit失败。
    在哪里触发自动回滚:在 startCommit 方法中 会先调用 CleanerUtils.rollbackFailedWrites 执行 rollback操作,相关源码分析见:Hudi源码|Insert源码分析总结(一)(整体流程)
    举例:
    001 commit 成功、002 commit 失败:先删除002 commit ,然后执行003 commit
    001 commit 成功、002 commit 成功:不回滚,只执行 003 commit
  • 手动回滚:rollback_to_instant 可以删除上一个成功的commit,回滚到上上个成功的commit。
    举例:
    001 commit 成功、002 commit 成功,可以回滚到001:删除002,最后一次commit 是001
    限制:只能一个一个回滚,也就是只能删除最后一个commit,回滚到倒数第二个commit。可以连续回滚,如果想回滚到某个时间点的coomit,只能用 rollback_to_savepoint 回滚到指定的 savepoint。

本文主要总结手动回滚:rollback_to_instant

SQL 语法

call show_commits(table => 'hudi.test_hudi_table', limit => 10);
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => 'committime');
call show_rollbacks(table => 'hudi.test_hudi_table');
call show_rollback_detail(table => 'hudi.test_hudi_table', instant_time => 'committime');

示例

数据准备

create table hudi.test_hudi_table (
  id int,
  name string,
  price double,
  ts long,
  dt string
) using hudi
 partitioned by (dt)
 options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 ); 

insert into hudi.test_hudi_table values (1,'hudi',10,100,'2024-06-03'); 
insert into hudi.test_hudi_table values (2,'hudi',20,200,'2024-06-03'); 
insert into hudi.test_hudi_table values (3,'hudi',30,300,'2024-06-03');  

SQL 执行

call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145257060       commit  435474  0       1       1       3       0       0
20240603145214563       commit  435442  0       1       1       2       0       0
20240603145154120       commit  435293  1    
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145257060');
call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145214563       commit  435442  0       1       1       2       0       0
20240603145154120       commit  435293  1       0       1       1       0       0
call show_rollbacks(table => 'hudi.test_hudi_table');
20240603145441704       20240603145257060       1       637     1
call show_rollback_detail(table => 'hudi.test_hudi_table', instant_time => '20240603145441704');
20240603145441704       [20240603145257060]     dt=2024-06-03   //cluster1/warehouse/tablespace/managed/hive/hudi.db/test_hudi_table/dt=2024-06-03/358c406e-12b6-4718-b102-f303eecec6bf-0_0-243-200_20240603145257060.parquet   true

继续 rollback

call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145214563');
call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145154120       commit  435293  1       0       1       1       0       0
call show_rollbacks(table => 'hudi.test_hudi_table');
20240603145441704       20240603145257060       1       637     1
20240603150458282       20240603145214563       1       628     1

结果验证

效果截图:

图片
图片
图片
图片

可以看到:第一次 rollback 删除了id=3的数据和commit,生成了 20240603145441704.rollback;第二次删除了id=2的数据和commit,生成了 20240603150458282.rollback

Flink 结合

验证 Spark SQL rollback Flink写的表是否有问题

Flink 造数

rollback 既可以回滚 cow 表也可以,也可以回滚 mor 表,上面验证的 cow 表,这次用 mor 表验证。

set execution.target=yarn-per-job;
set taskmanager.numberOfTaskSlots=1;
set execution.runtime-mode=batch;
set table.dml-sync = true;

CREATE TABLE test_flink_mor (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price int,
  ts int,
  dt string
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_flink_mor',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'hudi',
  'hive_sync.table' = 'test_flink_mor',
  'hive_sync.partition_fields' = 'dt',
  'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor',
  'hoodie.datasource.hive_sync.create_managed_table' = 'true',
  'compaction.async.enabled'= 'false',
  'compaction.delta_commits' = '2',
  'hoodie.compact.inline' = 'true',
  'hoodie.compact.inline.max.delta.commits' = '2'
);

insert into test_flink_mor values (1,'hudi',10,100,'2024-06-03');
insert into test_flink_mor values (2,'hudi',20,200,'2024-06-03');
insert into test_flink_mor values (3,'hudi',30,300,'2024-06-03');

验证

call show_commits(table => 'hudi.test_flink_mor_ro', limit => 10);
20240603152727949       deltacommit     903     0       1       1       1       1       0
20240603152637632       commit  435262  1       0       1       2       0       0
20240603152636150       deltacommit     903     0       1       1       1       1       0
20240603152552562       deltacommit     903     0       1       1       1       0       0
call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152727949');
call show_commits(table => 'hudi.test_flink_mor_ro', limit => 10);
20240603152637632       commit  435262  1       0       1       2       0       0
20240603152636150       deltacommit     903     0       1       1       1       1       0
20240603152552562       deltacommit     903     0       1       1       1       0       0

继续回滚:

call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632');

hudi 0.13.0 会抛异常

call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632');
24/06/03 15:49:40 ERROR SparkSQLDriver: Failed in [call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632')]
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback hdfs://cluster1/tmp/hudi/test_flink_mor commits 20240603152637632
Caused by: org.apache.hudi.exception.HoodieMetadataException: The instant [20240603152637632__deltacommit__COMPLETED] required to sync rollback of 20240603152637632 has been archived
图片
图片
图片

问题

这里实际删除了对应的commit 和数据 :20240603152637632.commit fc621eda-7f97-465d-b191-9eab0b4bb660_0-1-0_20240603152637632.parquet,但是.requested和 .inflight 没有删除成功就报错了。

也就是 Hudi 0.13.0 spark sql rollback spark 写的hudi 表没有问题,rollback flink写的表只有第一次成功,第二次会失败,导致之后无法继续进行rollback了。

问题原因:版本代码bug

问题解决

1、升级到目前最新发行版:0.14.1
2、如果因各种原因不能升级版本,可以合并相关PR到0.13.0 (暂不清楚涉及哪些PR)

升级验证

重新造数:test_flink_mor

图片
图片
图片
图片

可以看到连续两次rollback都是成功的!

测试 rollback 失败的 commit

rollback_to_instant

首先人为删除表 hudi.test_hudi_table 的 20240603145154120.commit,这样20240603145154120对应的commit就是不完整的、失败的。

hadoop fs -rm -r /warehouse/tablespace/managed/hive/hudi.db/test_hudi_table/.hoodie/20240603145154120.commit
图片

先测试 hudi 0.13.0 rollback_to_instant 是否正常

call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145154120');
24/06/04 09:51:34 ERROR SparkSQLDriver: Failed in [call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145154120')]
org.apache.hudi.exception.HoodieException: Commit 20240603145154120 not found in Commits org.apache.hudi.common.table.timeline.HoodieDefaultTimeline:

0.13.0 抛出异常:Commit 20240603145154120 not found in Commits

图片

然后测试 0.14.1 是否正常

自动回滚

测试写数据时是否会自动回滚。
先用flink insert一条数据,生成 .deltacommit 后,人为删除 .deltacommit,保留 .deltacommit.requested 和 .deltacommit.inflight
然后执行再用flink insert一条数据,看是否会触发rollback。

图片

可以看到写数据时,会先rollback上一次失败的commit,再写数据生成新的commit

相关异常

记录遇到的相关异常信息:https://note.youdao.com/s/EtQEL1wC

小结

对于 rollback_to_instant

  • 只能 rollback 最后一个commit
  • 可以连续rollback,对于Spark写的表没有问题,而对于Flink写的表 hudi 0.13.0 有bug ,可以升级到0.14.1解决。
  • 可以rollback 失败的commit,无论是Spark写的表还是Flink写的表,hudi 0.13.0 都有bug,均可以升级到0.14.1解决。
  • rollback 会删除对应的 .commit 和 数据文件,生成 .rollback
  • 可以回滚哪些?对于cow 表:.commit;对于 mor表:.deltacommit 和 .commit

原创文章,作者:guozi,如若转载,请注明出处:https://www.sudun.com/ask/89754.html

(0)
guozi的头像guozi
上一篇 2024年6月5日 上午10:30
下一篇 2024年6月5日 上午10:32

相关推荐

  • 创宇云

    云服务器行业的领军企业——创宇云,近年来备受关注。它究竟有何特点?又有着怎样的优势?产品介绍如何?价格标准与套餐选择又是如何?更重要的是,它在实际应用中是否能够带来令人满意的效果?…

    行业资讯 2024年4月7日
    0
  • 如何满足服务器租用的要求?

    云服务器行业近年来发展迅速,越来越多的企业开始关注如何满足服务器租用的要求。但是什么是服务器租用?它有哪些优势?适用于哪些场景?如何选择合适的服务商?又有哪些价格标准及相关费用呢?…

    行业资讯 2024年4月19日
    0
  • gpu的性能与价格有什么关系?

    在当今云服务器行业,GPU的性能与价格一直是备受关注的话题。众所周知,GPU作为一种专门用于图形处理的处理器,其强大的计算能力备受瞩目。那么,你是否好奇GPU的性能与价格之间存在着…

    行业资讯 2024年4月21日
    0
  • 高防服务器有用吗,国内高防服务器租用

    随着游戏行业的蓬勃发展,对游戏服务器的需求也不断增加。由此带来的网络安全问题也日益突出。高防服务器的出现就是为了解决这个问题。那么,什么是高防服务器?它有哪些技术优势呢?今天我们就…

    行业资讯 2024年3月24日
    0

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注