Rollbac
官方文档:https://hudi.apache.org/docs/rollbacks
回滚数据,可以回滚到至上一个commit(成功的),也可以回滚到指定 commit (savepoint)。本文暂不涉及savepoint,只总结 回滚到上一个 commit。
- 自动回滚:默认写任务时会自动触发回滚:
rollbackFailedWrites
。
每次写数据前会先检测上一个commit是否完成,如果上一个commit失败,删除上一个commit,如果成功,则不作任务操作。
如何检测commit失败:一个完整的commit 有 .commit.requested-> .inflight->.commit ,如果只有 .commit.requested 或 .inflight 则认为不完整,commit失败。
在哪里触发自动回滚:在 startCommit 方法中 会先调用 CleanerUtils.rollbackFailedWrites 执行 rollback操作,相关源码分析见:Hudi源码|Insert源码分析总结(一)(整体流程)
举例:
001 commit 成功、002 commit 失败:先删除002 commit ,然后执行003 commit
001 commit 成功、002 commit 成功:不回滚,只执行 003 commit - 手动回滚:
rollback_to_instant
可以删除上一个成功的commit,回滚到上上个成功的commit。
举例:
001 commit 成功、002 commit 成功,可以回滚到001:删除002,最后一次commit 是001
限制:只能一个一个回滚,也就是只能删除最后一个commit,回滚到倒数第二个commit。可以连续回滚,如果想回滚到某个时间点的coomit,只能用 rollback_to_savepoint 回滚到指定的 savepoint。
本文主要总结手动回滚:rollback_to_instant
SQL 语法
call show_commits(table => 'hudi.test_hudi_table', limit => 10);
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => 'committime');
call show_rollbacks(table => 'hudi.test_hudi_table');
call show_rollback_detail(table => 'hudi.test_hudi_table', instant_time => 'committime');
示例
数据准备
create table hudi.test_hudi_table (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
);
insert into hudi.test_hudi_table values (1,'hudi',10,100,'2024-06-03');
insert into hudi.test_hudi_table values (2,'hudi',20,200,'2024-06-03');
insert into hudi.test_hudi_table values (3,'hudi',30,300,'2024-06-03');
SQL 执行
call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145257060 commit 435474 0 1 1 3 0 0
20240603145214563 commit 435442 0 1 1 2 0 0
20240603145154120 commit 435293 1
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145257060');
call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145214563 commit 435442 0 1 1 2 0 0
20240603145154120 commit 435293 1 0 1 1 0 0
call show_rollbacks(table => 'hudi.test_hudi_table');
20240603145441704 20240603145257060 1 637 1
call show_rollback_detail(table => 'hudi.test_hudi_table', instant_time => '20240603145441704');
20240603145441704 [20240603145257060] dt=2024-06-03 //cluster1/warehouse/tablespace/managed/hive/hudi.db/test_hudi_table/dt=2024-06-03/358c406e-12b6-4718-b102-f303eecec6bf-0_0-243-200_20240603145257060.parquet true
继续 rollback
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145214563');
call show_commits(table => 'hudi.test_hudi_table', limit => 10);
20240603145154120 commit 435293 1 0 1 1 0 0
call show_rollbacks(table => 'hudi.test_hudi_table');
20240603145441704 20240603145257060 1 637 1
20240603150458282 20240603145214563 1 628 1
结果验证
效果截图:
可以看到:第一次 rollback 删除了id=3的数据和commit,生成了 20240603145441704.rollback;第二次删除了id=2的数据和commit,生成了 20240603150458282.rollback
Flink 结合
验证 Spark SQL rollback Flink写的表是否有问题
Flink 造数
rollback 既可以回滚 cow 表也可以,也可以回滚 mor 表,上面验证的 cow 表,这次用 mor 表验证。
set execution.target=yarn-per-job;
set taskmanager.numberOfTaskSlots=1;
set execution.runtime-mode=batch;
set table.dml-sync = true;
CREATE TABLE test_flink_mor (
id int PRIMARY KEY NOT ENFORCED,
name string,
price int,
ts int,
dt string
)
PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'path' = '/tmp/hudi/test_flink_mor',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
'hive_sync.db' = 'hudi',
'hive_sync.table' = 'test_flink_mor',
'hive_sync.partition_fields' = 'dt',
'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor',
'hoodie.datasource.hive_sync.create_managed_table' = 'true',
'compaction.async.enabled'= 'false',
'compaction.delta_commits' = '2',
'hoodie.compact.inline' = 'true',
'hoodie.compact.inline.max.delta.commits' = '2'
);
insert into test_flink_mor values (1,'hudi',10,100,'2024-06-03');
insert into test_flink_mor values (2,'hudi',20,200,'2024-06-03');
insert into test_flink_mor values (3,'hudi',30,300,'2024-06-03');
验证
call show_commits(table => 'hudi.test_flink_mor_ro', limit => 10);
20240603152727949 deltacommit 903 0 1 1 1 1 0
20240603152637632 commit 435262 1 0 1 2 0 0
20240603152636150 deltacommit 903 0 1 1 1 1 0
20240603152552562 deltacommit 903 0 1 1 1 0 0
call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152727949');
call show_commits(table => 'hudi.test_flink_mor_ro', limit => 10);
20240603152637632 commit 435262 1 0 1 2 0 0
20240603152636150 deltacommit 903 0 1 1 1 1 0
20240603152552562 deltacommit 903 0 1 1 1 0 0
继续回滚:
call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632');
hudi 0.13.0 会抛异常
call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632');
24/06/03 15:49:40 ERROR SparkSQLDriver: Failed in [call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632')]
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback hdfs://cluster1/tmp/hudi/test_flink_mor commits 20240603152637632
Caused by: org.apache.hudi.exception.HoodieMetadataException: The instant [20240603152637632__deltacommit__COMPLETED] required to sync rollback of 20240603152637632 has been archived
问题
这里实际删除了对应的commit 和数据 :20240603152637632.commit fc621eda-7f97-465d-b191-9eab0b4bb660_0-1-0_20240603152637632.parquet,但是.requested和 .inflight 没有删除成功就报错了。
也就是 Hudi 0.13.0 spark sql rollback spark 写的hudi 表没有问题,rollback flink写的表只有第一次成功,第二次会失败,导致之后无法继续进行rollback了。
问题原因:版本代码bug
问题解决
1、升级到目前最新发行版:0.14.1
2、如果因各种原因不能升级版本,可以合并相关PR到0.13.0 (暂不清楚涉及哪些PR)
升级验证
重新造数:test_flink_mor
可以看到连续两次rollback都是成功的!
测试 rollback 失败的 commit
rollback_to_instant
首先人为删除表 hudi.test_hudi_table 的 20240603145154120.commit,这样20240603145154120对应的commit就是不完整的、失败的。
hadoop fs -rm -r /warehouse/tablespace/managed/hive/hudi.db/test_hudi_table/.hoodie/20240603145154120.commit
先测试 hudi 0.13.0 rollback_to_instant
是否正常
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145154120');
24/06/04 09:51:34 ERROR SparkSQLDriver: Failed in [call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145154120')]
org.apache.hudi.exception.HoodieException: Commit 20240603145154120 not found in Commits org.apache.hudi.common.table.timeline.HoodieDefaultTimeline:
0.13.0 抛出异常:Commit 20240603145154120 not found in Commits
然后测试 0.14.1 是否正常
自动回滚
测试写数据时是否会自动回滚。
先用flink insert一条数据,生成 .deltacommit 后,人为删除 .deltacommit,保留 .deltacommit.requested 和 .deltacommit.inflight
然后执行再用flink insert一条数据,看是否会触发rollback。
可以看到写数据时,会先rollback上一次失败的commit,再写数据生成新的commit
相关异常
记录遇到的相关异常信息:https://note.youdao.com/s/EtQEL1wC
小结
对于 rollback_to_instant
- 只能 rollback 最后一个commit
- 可以连续rollback,对于Spark写的表没有问题,而对于Flink写的表 hudi 0.13.0 有bug ,可以升级到0.14.1解决。
- 可以rollback 失败的commit,无论是Spark写的表还是Flink写的表,hudi 0.13.0 都有bug,均可以升级到0.14.1解决。
- rollback 会删除对应的 .commit 和 数据文件,生成 .rollback
- 可以回滚哪些?对于cow 表:.commit;对于 mor表:.deltacommit 和 .commit
原创文章,作者:guozi,如若转载,请注明出处:https://www.sudun.com/ask/89754.html