Hudi Spark Sql Procedures 回滚 Hudi 表数据

Rollbac

官方文档:https://hudi.apache.org/docs/rollbacks

回滚数据,可以回滚到至上一个commit(成功的),也可以回滚到指定 commit (savepoint)。本文暂不涉及savepoint,只总结 回滚到上一个 commit。

  • 自动回滚:默认写任务时会自动触发回滚:rollbackFailedWrites
    每次写数据前会先检测上一个commit是否完成,如果上一个commit失败,删除上一个commit,如果成功,则不作任务操作。
    如何检测commit失败:一个完整的commit 有 .commit.requested-> .inflight->.commit ,如果只有  .commit.requested 或 .inflight 则认为不完整,commit失败。
    在哪里触发自动回滚:在 startCommit 方法中 会先调用 CleanerUtils.rollbackFailedWrites 执行 rollback操作,相关源码分析见:Hudi源码|Insert源码分析总结(一)(整体流程)
    举例:
    001 commit 成功、002 commit 失败:先删除002 commit ,然后执行003 commit
    001 commit 成功、002 commit 成功:不回滚,只执行 003 commit
  • 手动回滚:rollback_to_instant 可以删除上一个成功的commit,回滚到上上个成功的commit。
    举例:
    001 commit 成功、002 commit 成功,可以回滚到001:删除002,最后一次commit 是001
    限制:只能一个一个回滚,也就是只能删除最后一个commit,回滚到倒数第二个commit。可以连续回滚,如果想回滚到某个时间点的coomit,只能用 rollback_to_savepoint 回滚到指定的 savepoint。

本文主要总结手动回滚:rollback_to_instant

SQL 语法

call show_commits(table => 'hudi.test_hudi_table', limit => 10);
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => 'committime');
call show_rollbacks(table => 'hudi.test_hudi_table');
call show_rollback_detail(table => 'hudi.test_hudi_table', instant_time => 'committime');

示例

数据准备

create table hudi.test_hudi_table (
  id int,
  name string,
  price double,
  ts long,
  dt string
) using hudi
 partitioned by (dt)
 options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 ); 

insert into hudi.test_hudi_table values (1,'hudi',10,100,'2024-06-03'); 
insert into hudi.test_hudi_table values (2,'hudi',20,200,'2024-06-03'); 
insert into hudi.test_hudi_table values (3,'hudi',30,300,'2024-06-03');  

SQL 执行

call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145257060       commit  435474  0       1       1       3       0       0
20240603145214563       commit  435442  0       1       1       2       0       0
20240603145154120       commit  435293  1    
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145257060');
call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145214563       commit  435442  0       1       1       2       0       0
20240603145154120       commit  435293  1       0       1       1       0       0
call show_rollbacks(table => 'hudi.test_hudi_table');
20240603145441704       20240603145257060       1       637     1
call show_rollback_detail(table => 'hudi.test_hudi_table', instant_time => '20240603145441704');
20240603145441704       [20240603145257060]     dt=2024-06-03   //cluster1/warehouse/tablespace/managed/hive/hudi.db/test_hudi_table/dt=2024-06-03/358c406e-12b6-4718-b102-f303eecec6bf-0_0-243-200_20240603145257060.parquet   true

继续 rollback

call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145214563');
call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145154120       commit  435293  1       0       1       1       0       0
call show_rollbacks(table => 'hudi.test_hudi_table');
20240603145441704       20240603145257060       1       637     1
20240603150458282       20240603145214563       1       628     1

结果验证

效果截图:

图片
图片
图片
图片

可以看到:第一次 rollback 删除了id=3的数据和commit,生成了 20240603145441704.rollback;第二次删除了id=2的数据和commit,生成了 20240603150458282.rollback

Flink 结合

验证 Spark SQL rollback Flink写的表是否有问题

Flink 造数

rollback 既可以回滚 cow 表也可以,也可以回滚 mor 表,上面验证的 cow 表,这次用 mor 表验证。

set execution.target=yarn-per-job;
set taskmanager.numberOfTaskSlots=1;
set execution.runtime-mode=batch;
set table.dml-sync = true;

CREATE TABLE test_flink_mor (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price int,
  ts int,
  dt string
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_flink_mor',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'hudi',
  'hive_sync.table' = 'test_flink_mor',
  'hive_sync.partition_fields' = 'dt',
  'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor',
  'hoodie.datasource.hive_sync.create_managed_table' = 'true',
  'compaction.async.enabled'= 'false',
  'compaction.delta_commits' = '2',
  'hoodie.compact.inline' = 'true',
  'hoodie.compact.inline.max.delta.commits' = '2'
);

insert into test_flink_mor values (1,'hudi',10,100,'2024-06-03');
insert into test_flink_mor values (2,'hudi',20,200,'2024-06-03');
insert into test_flink_mor values (3,'hudi',30,300,'2024-06-03');

验证

call show_commits(table => 'hudi.test_flink_mor_ro', limit => 10);
20240603152727949       deltacommit     903     0       1       1       1       1       0
20240603152637632       commit  435262  1       0       1       2       0       0
20240603152636150       deltacommit     903     0       1       1       1       1       0
20240603152552562       deltacommit     903     0       1       1       1       0       0
call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152727949');
call show_commits(table => 'hudi.test_flink_mor_ro', limit => 10);
20240603152637632       commit  435262  1       0       1       2       0       0
20240603152636150       deltacommit     903     0       1       1       1       1       0
20240603152552562       deltacommit     903     0       1       1       1       0       0

继续回滚:

call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632');

hudi 0.13.0 会抛异常

call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632');
24/06/03 15:49:40 ERROR SparkSQLDriver: Failed in [call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632')]
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback hdfs://cluster1/tmp/hudi/test_flink_mor commits 20240603152637632
Caused by: org.apache.hudi.exception.HoodieMetadataException: The instant [20240603152637632__deltacommit__COMPLETED] required to sync rollback of 20240603152637632 has been archived
图片
图片
图片

问题

这里实际删除了对应的commit 和数据 :20240603152637632.commit fc621eda-7f97-465d-b191-9eab0b4bb660_0-1-0_20240603152637632.parquet,但是.requested和 .inflight 没有删除成功就报错了。

也就是 Hudi 0.13.0 spark sql rollback spark 写的hudi 表没有问题,rollback flink写的表只有第一次成功,第二次会失败,导致之后无法继续进行rollback了。

问题原因:版本代码bug

问题解决

1、升级到目前最新发行版:0.14.1
2、如果因各种原因不能升级版本,可以合并相关PR到0.13.0 (暂不清楚涉及哪些PR)

升级验证

重新造数:test_flink_mor

图片
图片
图片
图片

可以看到连续两次rollback都是成功的!

测试 rollback 失败的 commit

rollback_to_instant

首先人为删除表 hudi.test_hudi_table 的 20240603145154120.commit,这样20240603145154120对应的commit就是不完整的、失败的。

hadoop fs -rm -r /warehouse/tablespace/managed/hive/hudi.db/test_hudi_table/.hoodie/20240603145154120.commit
图片

先测试 hudi 0.13.0 rollback_to_instant 是否正常

call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145154120');
24/06/04 09:51:34 ERROR SparkSQLDriver: Failed in [call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145154120')]
org.apache.hudi.exception.HoodieException: Commit 20240603145154120 not found in Commits org.apache.hudi.common.table.timeline.HoodieDefaultTimeline:

0.13.0 抛出异常:Commit 20240603145154120 not found in Commits

图片

然后测试 0.14.1 是否正常

自动回滚

测试写数据时是否会自动回滚。
先用flink insert一条数据,生成 .deltacommit 后,人为删除 .deltacommit,保留 .deltacommit.requested 和 .deltacommit.inflight
然后执行再用flink insert一条数据,看是否会触发rollback。

图片

可以看到写数据时,会先rollback上一次失败的commit,再写数据生成新的commit

相关异常

记录遇到的相关异常信息:https://note.youdao.com/s/EtQEL1wC

小结

对于 rollback_to_instant

  • 只能 rollback 最后一个commit
  • 可以连续rollback,对于Spark写的表没有问题,而对于Flink写的表 hudi 0.13.0 有bug ,可以升级到0.14.1解决。
  • 可以rollback 失败的commit,无论是Spark写的表还是Flink写的表,hudi 0.13.0 都有bug,均可以升级到0.14.1解决。
  • rollback 会删除对应的 .commit 和 数据文件,生成 .rollback
  • 可以回滚哪些?对于cow 表:.commit;对于 mor表:.deltacommit 和 .commit

原创文章,作者:guozi,如若转载,请注明出处:https://www.sudun.com/ask/89754.html

Like (0)
guozi的头像guozi
Previous 2024年6月5日 上午10:30
Next 2024年6月5日 上午10:32

相关推荐

  • 如何选择合适的我的世界服务器租用价格?

    想要在我的世界中畅游,拥有一个顺畅的游戏体验,选择合适的服务器租用价格是非常重要的。但是,随着云服务器行业的发展,如何选择合适的我的世界服务器租用价格成为了一个让人头疼的问题。什么…

    行业资讯 2024年4月17日
    0
  • 百度诚信

    云服务器,这个词在近几年来已经变得家喻户晓。它代表着互联网时代的发展和进步,也是企业数字化转型的必备工具。而作为国内最大的互联网公司之一,百度推出了自己的云服务器产品——百度诚信。…

    行业资讯 2024年4月19日
    0
  • ping命令中ttl的作用是什么?(ttl值越低越好还是越高?)

    今天我们要聊的话题是关于网络互联网服务器行业的,标题是“ping命令中ttl的作用是什么?(ttl值越低越好还是越高?)”。这个问题相信大家都不陌生,但是你是否真正了解它的作用和使…

    行业资讯 2024年4月12日
    0
  • ip是否被墙,检测ip被墙

    Linode是著名的云服务器提供商,允许用户通过购买Linode服务器来构建自己的网站和应用程序。然而,近期不少用户反映,其linode服务器IP被屏蔽,无法正常访问网站和应用程序…

    行业资讯 2024年5月6日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注