TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/iceberg2");
tableLoader.open();
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
.execute();
复制代码
这是flink的小文件合并代码
public class Actions {
public static final Configuration CONFIG = new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
private StreamExecutionEnvironment env;
private Table table;
private Actions(StreamExecutionEnvironment env, Table table) {
this.env = env;
this.table = table;
}
public static Actions forTable(StreamExecutionEnvironment env, Table table) {
return new Actions(env, table);
}
public static Actions forTable(Table table) {
return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);
}
public RewriteDataFilesAction rewriteDataFiles() {
return new RewriteDataFilesAction(env, table);
}
}
复制代码
这是Actions类,应该主要是和spark中的结构对应,构造方法私有化,需提供静态方法forTable设置table参数去实例化Actions类,然后调用rewriteDataFiles方法返回RewriteDataFilesAction类对象,其中主要是做小文件合并的功能,调用execute()方法开始小文件合并,execute()是RewriteDataFilesAction的父类BaseRewriteDataFilesAction的方法,包含了小文件合并的主要逻辑
public RewriteDataFilesActionResult execute() {
CloseableIterable<FileScanTask> fileScanTasks = null;
try {
fileScanTasks = table.newScan()
.caseSensitive(caseSensitive)
.ignoreResiduals()
.filter(filter)
.planFiles();
} finally {
try {
if (fileScanTasks != null) {
fileScanTasks.close();
}
} catch (IOException ioe) {
LOG.warn("Failed to close task iterable", ioe);
}
}
Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
.filter(kv -> kv.getValue().size() > 1)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Nothing to rewrite if there's only one DataFile in each partition.
if (filteredGroupedTasks.isEmpty()) {
return RewriteDataFilesActionResult.empty();
}
// Split and combine tasks under each partition
List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
.map(scanTasks -> {
CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
})
.flatMap(Streams::stream)
.filter(task -> task.files().size() > 1 || isPartialFileScan(task))
.collect(Collectors.toList());
if (combinedScanTasks.isEmpty()) {
return RewriteDataFilesActionResult.empty();
}
List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
List<DataFile> currentDataFiles = combinedScanTasks.stream()
.flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
.collect(Collectors.toList());
replaceDataFiles(currentDataFiles, addedDataFiles);
return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
}
复制代码
planFiles()方法在之前的iceberg读取数据文章中分析过,主要是返回FileScanTask的迭代器,FileScanTask中包含了每个data文件和其需要过滤掉的delete文件。
然后根据分区group by
再生成combinedScanTasks任务列表,其中splitFiles根据目标文件大小切分文件,planTasks则是生成合并文件的任务,然后会根据一些条件去过滤,比如task.files().size()>1,表示合并文件的个数大于一才生成任务,但是如果是v2版本支持delete的情况,
如上程序中,表中有5个data文件,因此有5个fileScan任务
而生成的combinedScanTasks中有两个tasks,其中每个tasks中都包含有2两个FileScanTask,所以有4个文件进行合并,而剩下的单独的一个文件被task文件数大于1的条件过滤掉了
最后replaceDataFiles方法重写元数据
public void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
try {
RewriteFiles rewriteFiles = table.newRewrite();
rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
commit(rewriteFiles);
} catch (Exception e) {
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(fileIO::deleteFile);
throw e;
}
}
复制代码
RewriteFiles对象中记录了4种文件:
1.需要被删除的data文件
2.需要被删除的delete文件
3.需要被添加的data文件
4.需要被添加的delete文件
RewriteFiles继承自SnapshotUpdate作为一种Snapshot更新的操作,具体实现类为BaseRewriteFiles,同时继承了MergingSnapshotProducer抽象类继承了SnapshotProducer抽象类中是实现了commit()对Snapshot更新的操作进行提交
@Override
public void commit() {
// this is always set to the latest commit attempt's snapshot id.
AtomicLong newSnapshotId = new AtomicLong(-1L);
try {
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata updated;
if (stageOnly) {
updated = base.addStagedSnapshot(newSnapshot);
} else {
updated = base.replaceCurrentSnapshot(newSnapshot);
}
if (updated == base) {
// do not commit if the metadata has not changed. for example, this may happen when setting the current
// snapshot to an ID that is already current. note that this check uses identity.
return;
}
// if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
// to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
taskOps.commit(base, updated.withUUID());
});
} catch (CommitStateUnknownException commitStateUnknownException) {
throw commitStateUnknownException;
} catch (RuntimeException e) {
Exceptions.suppressAndThrow(e, this::cleanAll);
}
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());
try {
// at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
// id in case another commit was added between this commit and the refresh.
Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
if (saved != null) {
cleanUncommitted(Sets.newHashSet(saved.allManifests()));
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!saved.manifestListLocation().equals(manifestList)) {
deleteFile(manifestList);
}
}
} else {
// saved may not be present if the latest metadata couldn't be loaded due to eventual
// consistency problems in refresh. in that case, don't clean up.
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
}
} catch (RuntimeException e) {
LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", e);
}
notifyListeners();
}
复制代码
这里使用Tasks应该是一个自己实现的多线程运行任务的类,
调用apply()方法返回一个新的Snapshot,apply()方法实现在SnapshotProducer类中,
public Snapshot apply() {
this.base = refresh();
Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
long sequenceNumber = base.nextSequenceNumber();
// run validations from the child operation
validate(base);
List<ManifestFile> manifests = apply(base);
if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
OutputFile manifestList = manifestListPath();
try (ManifestListWriter writer = ManifestLists.write(
ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) {
// keep track of the manifest lists created
manifestLists.add(manifestList.location());
ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
Tasks.range(manifestFiles.length)
.stopOnFailure().throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(index ->
manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index)));
writer.addAll(Arrays.asList(manifestFiles));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write manifest list file");
}
return new BaseSnapshot(ops.io(),
sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
manifestList.location());
} else {
return new BaseSnapshot(ops.io(),
snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
manifests);
}
}
复制代码
其中又调用了一个apply()方法,在其中进行了ManifestFile的写磁盘逻辑并返回List对象,apply()方法在MergingSnapshotProducer抽象类中实现
public List<ManifestFile> apply(TableMetadata base) {
Snapshot current = base.currentSnapshot();
// filter any existing manifests
List<ManifestFile> filtered = filterManager.filterManifests(
base.schema(), current != null ? current.dataManifests() : null);
long minDataSequenceNumber = filtered.stream()
.map(ManifestFile::minSequenceNumber)
.filter(seq -> seq > 0) // filter out unassigned sequence numbers in rewritten manifests
.reduce(base.lastSequenceNumber(), Math::min);
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
List<ManifestFile> filteredDeletes = deleteFilterManager.filterManifests(
base.schema(), current != null ? current.deleteManifests() : null);
// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep = manifest ->
manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId();
Iterable<ManifestFile> unmergedManifests = Iterables.filter(
Iterables.concat(prepareNewManifests(), filtered), shouldKeep);
Iterable<ManifestFile> unmergedDeleteManifests = Iterables.filter(
Iterables.concat(prepareDeleteManifests(), filteredDeletes), shouldKeep);
// update the snapshot summary
summaryBuilder.clear();
summaryBuilder.merge(addedFilesSummary);
summaryBuilder.merge(appendedManifestsSummary);
summaryBuilder.merge(filterManager.buildSummary(filtered));
summaryBuilder.merge(deleteFilterManager.buildSummary(filteredDeletes));
List<ManifestFile> manifests = Lists.newArrayList();
Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));
return manifests;
}
复制代码
调用filterManager中的filterManifests方法过滤出data ManifestFile,这里首先通过current.dataManifests()方法得到当前的dataManifests,
/**
* Filter deleted files out of a list of manifests.
*
* @param tableSchema the current table schema
* @param manifests a list of manifests to be filtered
* @return an array of filtered manifests
*/
List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manifests) {
if (manifests == null || manifests.isEmpty()) {
validateRequiredDeletes();
return ImmutableList.of();
}
// use a common metrics evaluator for all manifests because it is bound to the table schema
StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(tableSchema, deleteExpression);
ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
Tasks.range(filtered.length)
.stopOnFailure().throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(index -> {
ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
filtered[index] = manifest;
});
validateRequiredDeletes(filtered);
return Arrays.asList(filtered);
}
复制代码
然后调用filterManifest方法过滤Manifest文件
/**
* @return a ManifestReader that is a filtered version of the input manifest.
*/
private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest) {
ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
}
boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
filteredManifests.put(manifest, manifest);
return manifest;
}
try (ManifestReader<F> reader = newManifestReader(manifest)) {
// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
}
return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
}
复制代码
其中调用manifestHasDeletedFiles判断当前Manifest文件中是否有需要删除的文件
private boolean manifestHasDeletedFiles(
StrictMetricsEvaluator metricsEvaluator, ManifestReader<F> reader) {
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
boolean hasDeletedFiles = false;
for (ManifestEntry<F> entry : reader.entries()) {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if (fileDelete || inclusive.eval(file.partition())) {
ValidationException.check(
fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression, file.path());
hasDeletedFiles = true;
if (failAnyDelete) {
throw new DeleteException(reader.spec().partitionToPath(file.partition()));
}
break; // as soon as a deleted file is detected, stop scanning
}
}
return hasDeletedFiles;
}
复制代码
通过deletePaths.contains判断该文件是否已删除
然后在filterManifest方法的最后调用filterManifestWithDeletedFiles
其中对Manifest文件进行了重写
Snapshot实现类为BaseSnapShot,其中就已经包含了生成的snapshot avro文件名和包含的SnapshotID,SequenceNumber,Operator,添加删除文件数等信息
然后调用base.replaceCurrentSnapshot方法创建TableMetadata对象
public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
// there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
if (snapshotsById.containsKey(snapshot.snapshotId())) {
return setCurrentSnapshotTo(snapshot);
}
ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
"Cannot add snapshot with sequence number %s older than last sequence number %s",
snapshot.sequenceNumber(), lastSequenceNumber);
List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
.addAll(snapshots)
.add(snapshot)
.build();
List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
.addAll(snapshotLog)
.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
.build();
return new TableMetadata(null, formatVersion, uuid, location,
snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
defaultSortOrderId, sortOrders, rowKey,
properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
复制代码
TableOperations的commit方法提交,这里的是HadoopTableOperations中的commit实现
public void commit(TableMetadata base, TableMetadata metadata) {
Pair<Integer, TableMetadata> current = versionAndMetadata();
if (base != current.second()) {
throw new CommitFailedException("Cannot commit changes based on stale table metadata");
}
if (base == metadata) {
LOG.info("Nothing to commit.");
return;
}
Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
"Hadoop path-based tables cannot be relocated");
Preconditions.checkArgument(
!metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
"Hadoop path-based tables cannot relocate metadata");
String codecName = metadata.property(
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
String fileExtension = TableMetadataParser.getFileExtension(codec);
Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
int nextVersion = (current.first() != null ? current.first() : 0) + 1;
Path finalMetadataFile = metadataFilePath(nextVersion, codec);
FileSystem fs = getFileSystem(tempMetadataFile, conf);
try {
if (fs.exists(finalMetadataFile)) {
throw new CommitFailedException(
"Version %d already exists: %s", nextVersion, finalMetadataFile);
}
} catch (IOException e) {
throw new RuntimeIOException(e,
"Failed to check if next version exists: %s", finalMetadataFile);
}
// this rename operation is the atomic commit operation
renameToFinal(fs, tempMetadataFile, finalMetadataFile);
// update the best-effort version pointer
writeVersionHint(nextVersion);
deleteRemovedMetadataFiles(base, metadata);
this.shouldRefresh = true;
}
复制代码
首先调用方法返回
private synchronized Pair<Integer, TableMetadata> versionAndMetadata() {
return Pair.of(version, currentMetadata);
}
复制代码
currentMetadata为TableMetadata对象包含了当前的Metadata.json文件等信息,version为当前的版本号
回到commit方法中,先创建一个临时的metadata文件tempMetadataFile,调用TableMetadataParser中的write方法写入磁盘,
public static void write(TableMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, false);
}
public static void internalWrite(
TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
try (OutputStream ou = isGzip ? new GZIPOutputStream(stream) : stream;
OutputStreamWriter writer = new OutputStreamWriter(ou, StandardCharsets.UTF_8)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
generator.flush();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile);
}
}
复制代码
主要的metadata相关代码在toJson中
然后生成新的版本号,和正式文件名,重命名,写入版本文件,删除临时文件