iceberg小文件合并

TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/iceberg2");
tableLoader.open();
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
         .rewriteDataFiles()
         .execute();
复制代码

这是flink的小文件合并代码

public class Actions {

  public static final Configuration CONFIG = new Configuration()
      // disable classloader check as Avro may cache class/object in the serializers.
      .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

  private StreamExecutionEnvironment env;
  private Table table;

  private Actions(StreamExecutionEnvironment env, Table table) {
    this.env = env;
    this.table = table;
  }

  public static Actions forTable(StreamExecutionEnvironment env, Table table) {
    return new Actions(env, table);
  }

  public static Actions forTable(Table table) {
    return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);
  }

  public RewriteDataFilesAction rewriteDataFiles() {
    return new RewriteDataFilesAction(env, table);
  }

}
复制代码

这是Actions类,应该主要是和spark中的结构对应,构造方法私有化,需提供静态方法forTable设置table参数去实例化Actions类,然后调用rewriteDataFiles方法返回RewriteDataFilesAction类对象,其中主要是做小文件合并的功能,调用execute()方法开始小文件合并,execute()是RewriteDataFilesAction的父类BaseRewriteDataFilesAction的方法,包含了小文件合并的主要逻辑

public RewriteDataFilesActionResult execute() {
    CloseableIterable<FileScanTask> fileScanTasks = null;
    try {
      fileScanTasks = table.newScan()
          .caseSensitive(caseSensitive)
          .ignoreResiduals()
          .filter(filter)
          .planFiles();
    } finally {
      try {
        if (fileScanTasks != null) {
          fileScanTasks.close();
        }
      } catch (IOException ioe) {
        LOG.warn("Failed to close task iterable", ioe);
      }
    }

    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
        .filter(kv -> kv.getValue().size() > 1)
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

    // Nothing to rewrite if there's only one DataFile in each partition.
    if (filteredGroupedTasks.isEmpty()) {
      return RewriteDataFilesActionResult.empty();
    }
    // Split and combine tasks under each partition
    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
        .map(scanTasks -> {
          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
        })
        .flatMap(Streams::stream)
        .filter(task -> task.files().size() > 1 || isPartialFileScan(task))
        .collect(Collectors.toList());

    if (combinedScanTasks.isEmpty()) {
      return RewriteDataFilesActionResult.empty();
    }

    List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
    List<DataFile> currentDataFiles = combinedScanTasks.stream()
        .flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
        .collect(Collectors.toList());
    replaceDataFiles(currentDataFiles, addedDataFiles);

    return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
  }
复制代码

planFiles()方法在之前的iceberg读取数据文章中分析过,主要是返回FileScanTask的迭代器,FileScanTask中包含了每个data文件和其需要过滤掉的delete文件。
然后根据分区group by
再生成combinedScanTasks任务列表,其中splitFiles根据目标文件大小切分文件,planTasks则是生成合并文件的任务,然后会根据一些条件去过滤,比如task.files().size()>1,表示合并文件的个数大于一才生成任务,但是如果是v2版本支持delete的情况,

image.png
如上程序中,表中有5个data文件,因此有5个fileScan任务

image.png
而生成的combinedScanTasks中有两个tasks,其中每个tasks中都包含有2两个FileScanTask,所以有4个文件进行合并,而剩下的单独的一个文件被task文件数大于1的条件过滤掉了
最后replaceDataFiles方法重写元数据

 public void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
    try {
      RewriteFiles rewriteFiles = table.newRewrite();
      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
      commit(rewriteFiles);
    } catch (Exception e) {
      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
          .noRetry()
          .suppressFailureWhenFinished()
          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
          .run(fileIO::deleteFile);
      throw e;
    }
  }
复制代码

RewriteFiles对象中记录了4种文件:
1.需要被删除的data文件
2.需要被删除的delete文件
3.需要被添加的data文件
4.需要被添加的delete文件
RewriteFiles继承自SnapshotUpdate作为一种Snapshot更新的操作,具体实现类为BaseRewriteFiles,同时继承了MergingSnapshotProducer抽象类继承了SnapshotProducer抽象类中是实现了commit()对Snapshot更新的操作进行提交

@Override
  public void commit() {
    // this is always set to the latest commit attempt's snapshot id.
    AtomicLong newSnapshotId = new AtomicLong(-1L);
    try {
      Tasks.foreach(ops)
          .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
          .exponentialBackoff(
              base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
              base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
              base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
              2.0 /* exponential */)
          .onlyRetryOn(CommitFailedException.class)
          .run(taskOps -> {
            Snapshot newSnapshot = apply();
            newSnapshotId.set(newSnapshot.snapshotId());
            TableMetadata updated;
            if (stageOnly) {
              updated = base.addStagedSnapshot(newSnapshot);
            } else {
              updated = base.replaceCurrentSnapshot(newSnapshot);
            }

            if (updated == base) {
              // do not commit if the metadata has not changed. for example, this may happen when setting the current
              // snapshot to an ID that is already current. note that this check uses identity.
              return;
            }

            // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
            // to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
            taskOps.commit(base, updated.withUUID());
          });

    } catch (CommitStateUnknownException commitStateUnknownException) {
      throw commitStateUnknownException;
    } catch (RuntimeException e) {
      Exceptions.suppressAndThrow(e, this::cleanAll);
    }

    LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());

    try {
      // at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
      // id in case another commit was added between this commit and the refresh.
      Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
      if (saved != null) {
        cleanUncommitted(Sets.newHashSet(saved.allManifests()));
        // also clean up unused manifest lists created by multiple attempts
        for (String manifestList : manifestLists) {
          if (!saved.manifestListLocation().equals(manifestList)) {
            deleteFile(manifestList);
          }
        }
      } else {
        // saved may not be present if the latest metadata couldn't be loaded due to eventual
        // consistency problems in refresh. in that case, don't clean up.
        LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
      }

    } catch (RuntimeException e) {
      LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", e);
    }

    notifyListeners();
  }
复制代码

这里使用Tasks应该是一个自己实现的多线程运行任务的类,
调用apply()方法返回一个新的Snapshot,apply()方法实现在SnapshotProducer类中,

  public Snapshot apply() {
    this.base = refresh();
    Long parentSnapshotId = base.currentSnapshot() != null ?
        base.currentSnapshot().snapshotId() : null;
    long sequenceNumber = base.nextSequenceNumber();

    // run validations from the child operation
    validate(base);

    List<ManifestFile> manifests = apply(base);

    if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
      OutputFile manifestList = manifestListPath();

      try (ManifestListWriter writer = ManifestLists.write(
          ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) {

        // keep track of the manifest lists created
        manifestLists.add(manifestList.location());

        ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];

        Tasks.range(manifestFiles.length)
            .stopOnFailure().throwFailureWhenFinished()
            .executeWith(ThreadPools.getWorkerPool())
            .run(index ->
                manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index)));

        writer.addAll(Arrays.asList(manifestFiles));

      } catch (IOException e) {
        throw new RuntimeIOException(e, "Failed to write manifest list file");
      }

      return new BaseSnapshot(ops.io(),
          sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
          manifestList.location());

    } else {
      return new BaseSnapshot(ops.io(),
          snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
          manifests);
    }
  }
复制代码

其中又调用了一个apply()方法,在其中进行了ManifestFile的写磁盘逻辑并返回List对象,apply()方法在MergingSnapshotProducer抽象类中实现

public List<ManifestFile> apply(TableMetadata base) {
    Snapshot current = base.currentSnapshot();

    // filter any existing manifests
    List<ManifestFile> filtered = filterManager.filterManifests(
        base.schema(), current != null ? current.dataManifests() : null);
    long minDataSequenceNumber = filtered.stream()
        .map(ManifestFile::minSequenceNumber)
        .filter(seq -> seq > 0) // filter out unassigned sequence numbers in rewritten manifests
        .reduce(base.lastSequenceNumber(), Math::min);
    deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
    List<ManifestFile> filteredDeletes = deleteFilterManager.filterManifests(
        base.schema(), current != null ? current.deleteManifests() : null);

    // only keep manifests that have live data files or that were written by this commit
    Predicate<ManifestFile> shouldKeep = manifest ->
        manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId();
    Iterable<ManifestFile> unmergedManifests = Iterables.filter(
        Iterables.concat(prepareNewManifests(), filtered), shouldKeep);
    Iterable<ManifestFile> unmergedDeleteManifests = Iterables.filter(
        Iterables.concat(prepareDeleteManifests(), filteredDeletes), shouldKeep);

    // update the snapshot summary
    summaryBuilder.clear();
    summaryBuilder.merge(addedFilesSummary);
    summaryBuilder.merge(appendedManifestsSummary);
    summaryBuilder.merge(filterManager.buildSummary(filtered));
    summaryBuilder.merge(deleteFilterManager.buildSummary(filteredDeletes));

    List<ManifestFile> manifests = Lists.newArrayList();
    Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
    Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));

    return manifests;
  }
复制代码

调用filterManager中的filterManifests方法过滤出data ManifestFile,这里首先通过current.dataManifests()方法得到当前的dataManifests,

/**
   * Filter deleted files out of a list of manifests.
   *
   * @param tableSchema the current table schema
   * @param manifests a list of manifests to be filtered
   * @return an array of filtered manifests
   */
  List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manifests) {
    if (manifests == null || manifests.isEmpty()) {
      validateRequiredDeletes();
      return ImmutableList.of();
    }

    // use a common metrics evaluator for all manifests because it is bound to the table schema
    StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(tableSchema, deleteExpression);

    ManifestFile[] filtered = new ManifestFile[manifests.size()];
    // open all of the manifest files in parallel, use index to avoid reordering
    Tasks.range(filtered.length)
        .stopOnFailure().throwFailureWhenFinished()
        .executeWith(ThreadPools.getWorkerPool())
        .run(index -> {
          ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
          filtered[index] = manifest;
        });

    validateRequiredDeletes(filtered);

    return Arrays.asList(filtered);
  }
复制代码

然后调用filterManifest方法过滤Manifest文件

/**
   * @return a ManifestReader that is a filtered version of the input manifest.
   */
  private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest) {
    ManifestFile cached = filteredManifests.get(manifest);
    if (cached != null) {
      return cached;
    }

    boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
    if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
      filteredManifests.put(manifest, manifest);
      return manifest;
    }

    try (ManifestReader<F> reader = newManifestReader(manifest)) {
      // this assumes that the manifest doesn't have files to remove and streams through the
      // manifest without copying data. if a manifest does have a file to remove, this will break
      // out of the loop and move on to filtering the manifest.
      boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader);
      if (!hasDeletedFiles) {
        filteredManifests.put(manifest, manifest);
        return manifest;
      }

      return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader);

    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
    }
  }
复制代码

其中调用manifestHasDeletedFiles判断当前Manifest文件中是否有需要删除的文件

private boolean manifestHasDeletedFiles(
      StrictMetricsEvaluator metricsEvaluator, ManifestReader<F> reader) {
    boolean isDelete = reader.isDeleteManifestReader();
    Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
    Evaluator strict = strictDeleteEvaluator(reader.spec());
    boolean hasDeletedFiles = false;
    for (ManifestEntry<F> entry : reader.entries()) {
      F file = entry.file();
      boolean fileDelete = deletePaths.contains(file.path()) ||
          dropPartitions.contains(file.specId(), file.partition()) ||
          (isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
      if (fileDelete || inclusive.eval(file.partition())) {
        ValidationException.check(
            fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
            "Cannot delete file where some, but not all, rows match filter %s: %s",
            this.deleteExpression, file.path());

        hasDeletedFiles = true;
        if (failAnyDelete) {
          throw new DeleteException(reader.spec().partitionToPath(file.partition()));
        }
        break; // as soon as a deleted file is detected, stop scanning
      }
    }
    return hasDeletedFiles;
  }
复制代码

通过deletePaths.contains判断该文件是否已删除
然后在filterManifest方法的最后调用filterManifestWithDeletedFiles
其中对Manifest文件进行了重写

Snapshot实现类为BaseSnapShot,其中就已经包含了生成的snapshot avro文件名和包含的SnapshotID,SequenceNumber,Operator,添加删除文件数等信息

image.png
然后调用base.replaceCurrentSnapshot方法创建TableMetadata对象

public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
    // there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
    if (snapshotsById.containsKey(snapshot.snapshotId())) {
      return setCurrentSnapshotTo(snapshot);
    }

    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
        "Cannot add snapshot with sequence number %s older than last sequence number %s",
        snapshot.sequenceNumber(), lastSequenceNumber);

    List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
        .addAll(snapshots)
        .add(snapshot)
        .build();
    List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
        .addAll(snapshotLog)
        .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
        .build();

    return new TableMetadata(null, formatVersion, uuid, location,
        snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
        currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
        defaultSortOrderId, sortOrders, rowKey,
        properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
  }
复制代码

TableOperations的commit方法提交,这里的是HadoopTableOperations中的commit实现

public void commit(TableMetadata base, TableMetadata metadata) {
    Pair<Integer, TableMetadata> current = versionAndMetadata();
    if (base != current.second()) {
      throw new CommitFailedException("Cannot commit changes based on stale table metadata");
    }

    if (base == metadata) {
      LOG.info("Nothing to commit.");
      return;
    }

    Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
        "Hadoop path-based tables cannot be relocated");
    Preconditions.checkArgument(
        !metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
        "Hadoop path-based tables cannot relocate metadata");

    String codecName = metadata.property(
        TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
    TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
    String fileExtension = TableMetadataParser.getFileExtension(codec);
    Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
    TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));

    int nextVersion = (current.first() != null ? current.first() : 0) + 1;
    Path finalMetadataFile = metadataFilePath(nextVersion, codec);
    FileSystem fs = getFileSystem(tempMetadataFile, conf);

    try {
      if (fs.exists(finalMetadataFile)) {
        throw new CommitFailedException(
            "Version %d already exists: %s", nextVersion, finalMetadataFile);
      }
    } catch (IOException e) {
      throw new RuntimeIOException(e,
          "Failed to check if next version exists: %s", finalMetadataFile);
    }

    // this rename operation is the atomic commit operation
    renameToFinal(fs, tempMetadataFile, finalMetadataFile);

    // update the best-effort version pointer
    writeVersionHint(nextVersion);

    deleteRemovedMetadataFiles(base, metadata);

    this.shouldRefresh = true;
  }
复制代码

首先调用方法返回

private synchronized Pair<Integer, TableMetadata> versionAndMetadata() {
    return Pair.of(version, currentMetadata);
  }
复制代码

currentMetadata为TableMetadata对象包含了当前的Metadata.json文件等信息,version为当前的版本号

image.png

回到commit方法中,先创建一个临时的metadata文件tempMetadataFile,调用TableMetadataParser中的write方法写入磁盘,

public static void write(TableMetadata metadata, OutputFile outputFile) {
    internalWrite(metadata, outputFile, false);
  }

  public static void internalWrite(
      TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
    boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
    OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
    try (OutputStream ou = isGzip ? new GZIPOutputStream(stream) : stream;
         OutputStreamWriter writer = new OutputStreamWriter(ou, StandardCharsets.UTF_8)) {
      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
      generator.useDefaultPrettyPrinter();
      toJson(metadata, generator);
      generator.flush();
    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile);
    }
  }
复制代码

主要的metadata相关代码在toJson中
然后生成新的版本号,和正式文件名,重命名,写入版本文件,删除临时文件

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享