flink读取iceberg核心流程

读取流程一般都是通过planTasks方法返回CombinedScanTask,然后根据CombinedScanTask生成RowDataIterator迭代器访问数据

RowDataIterator iterator = new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)

在其父类DataIterator中调用

currentIterator = openTaskIterator(tasks.next());

生成读取数据的迭代器

  @Override
  protected CloseableIterator<RowData> openTaskIterator(FileScanTask task) {
    Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds());

    Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() :
        PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);

    FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema);
    CloseableIterable<RowData> iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant));

    return iterable.iterator();
  }
复制代码

其中 FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema);创建FlinkDeleteFilter,

FlinkDeleteFilter父类中DeleteFilter构造方法

protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
    this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
    this.dataFile = task.file();

    ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
    ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
    for (DeleteFile delete : task.deletes()) {
      switch (delete.content()) {
        case POSITION_DELETES:
          posDeleteBuilder.add(delete);
          break;
        case EQUALITY_DELETES:
          eqDeleteBuilder.add(delete);
          break;
        default:
          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
      }
    }

    this.posDeletes = posDeleteBuilder.build();
    this.eqDeletes = eqDeleteBuilder.build();
    this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
    this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
  }
复制代码

fileProjection中进行构建schema,首先构建requiredIds,如果存在posDeletes则在requiredIds添加ROW_POSITION,如果存在eqDeletes则在requiredIds添加对应的 equalityFieldIds,生成其他的字段集合missingIds,总之最终会添加一个ROW_POSITION字段在最后,

private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
                                       List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
    if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
      return requestedSchema;
    }

    Set<Integer> requiredIds = Sets.newLinkedHashSet();
    if (!posDeletes.isEmpty()) {
      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
    }

    for (DeleteFile eqDelete : eqDeletes) {
      requiredIds.addAll(eqDelete.equalityFieldIds());
    }

    Set<Integer> missingIds = Sets.newLinkedHashSet(
        Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema)));

    if (missingIds.isEmpty()) {
      return requestedSchema;
    }

    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
    List<Types.NestedField> columns = Lists.newArrayList(requestedSchema.columns());
    for (int fieldId : missingIds) {
      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
        continue; // add _pos at the end
      }

      Types.NestedField field = tableSchema.asStruct().field(fieldId);
      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);

      columns.add(field);
    }

    if (missingIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
      columns.add(MetadataColumns.ROW_POSITION);
    }

    return new Schema(columns);
  }
复制代码

CloseableIterable<RowData> iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant));
根据deletes和task生成迭代器
newIterable(task, deletes.requiredSchema(), idToConstant)先生成一个data文件数据读取的迭代器,然后在调用deletes.filter()过滤掉其他delete数据,filter中会先使用applyPosDeletes,再使用applyEqDeletes,applyPosDeletes之前在spark读取中分析过,会先把posDelete文件读取到set中,然后放入到一个堆中,生成一个迭代器,然后归并迭代过滤掉删除的pos的数据,下面看下主要看下applyEqDeletes的逻辑

  public CloseableIterable<T> filter(CloseableIterable<T> records) {
    return applyEqDeletes(applyPosDeletes(records));
  }
复制代码
  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
    // Predicate to test whether a row should be visible to user after applying equality deletions.
    Predicate<T> remainingRows = applyEqDeletes().stream()
        .map(Predicate::negate)
        .reduce(Predicate::and)
        .orElse(t -> true);

    Filter<T> remainingRowsFilter = new Filter<T>() {
      @Override
      protected boolean shouldKeep(T item) {
        return remainingRows.test(item);
      }
    };

    return remainingRowsFilter.filter(records);
  }
复制代码

这里会先创建一个Predicate对象–过滤器remainingRows,然后调用其test方法进行过滤。
首先调用applyEqDeletes()方法

  private List<Predicate<T>> applyEqDeletes() {
    List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
    if (eqDeletes.isEmpty()) {
      return isInDeleteSets;
    }

    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
    for (DeleteFile delete : eqDeletes) {
      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
    }

    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
      Set<Integer> ids = entry.getKey();
      Iterable<DeleteFile> deletes = entry.getValue();

      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);

      // a projection to select and reorder fields of the file schema to match the delete rows
      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);

      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
          delete -> openDeletes(delete, deleteSchema));
      StructLikeSet deleteSet = Deletes.toEqualitySet(
          // copy the delete records because they will be held in a set
          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
          deleteSchema.asStruct());

      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
      isInDeleteSets.add(isInDeleteSet);
    }

    return isInDeleteSets;
  }
复制代码

这里先将delete文件按equalityFieldIds为key分类放入map中,在每个map的entry中调用Deletes.toEqualitySet生成一个StructLikeSet对象,然后构造过滤器返回,这里应该是读取和小文件并的主要瓶颈所在,构造需要读取EquilityDeleteFiles然后构造成StructLikeSet,首先从hdfs中读取EquilityDeleteFiles需要花费大量的时间,目前delete文件是写整条数据,但其实需要使用的只是equalityFieldIds,这里写的时候就浪费了io耗时和存储空间,读取的时候根据测试结果来看,如果整条记录较长的话构造StructLikeSet花费的时间也会急剧增加,而这里使用的是parquet列式存储文件进行存储,应该是有列裁剪 Column Pruning 和 映射下推 Project PushDown的优化功能的,但是更具测试感觉没有体现到,然后是构造StructLikeSet其中倒是只存储了equalityFieldIds列,但是依旧花费了大量的内存且数据量越大查询效率越低,这里也是可以考虑优化。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享