读取流程一般都是通过planTasks方法返回CombinedScanTask,然后根据CombinedScanTask生成RowDataIterator迭代器访问数据
RowDataIterator iterator = new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)
在其父类DataIterator中调用
currentIterator = openTaskIterator(tasks.next());
生成读取数据的迭代器
@Override
protected CloseableIterator<RowData> openTaskIterator(FileScanTask task) {
Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds());
Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() :
PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema);
CloseableIterable<RowData> iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant));
return iterable.iterator();
}
复制代码
其中 FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema);
创建FlinkDeleteFilter,
FlinkDeleteFilter父类中DeleteFilter构造方法
protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
this.dataFile = task.file();
ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
for (DeleteFile delete : task.deletes()) {
switch (delete.content()) {
case POSITION_DELETES:
posDeleteBuilder.add(delete);
break;
case EQUALITY_DELETES:
eqDeleteBuilder.add(delete);
break;
default:
throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
}
}
this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
}
复制代码
fileProjection中进行构建schema,首先构建requiredIds,如果存在posDeletes则在requiredIds添加ROW_POSITION,如果存在eqDeletes则在requiredIds添加对应的 equalityFieldIds,生成其他的字段集合missingIds,总之最终会添加一个ROW_POSITION字段在最后,
private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
return requestedSchema;
}
Set<Integer> requiredIds = Sets.newLinkedHashSet();
if (!posDeletes.isEmpty()) {
requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
}
for (DeleteFile eqDelete : eqDeletes) {
requiredIds.addAll(eqDelete.equalityFieldIds());
}
Set<Integer> missingIds = Sets.newLinkedHashSet(
Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema)));
if (missingIds.isEmpty()) {
return requestedSchema;
}
// TODO: support adding nested columns. this will currently fail when finding nested columns to add
List<Types.NestedField> columns = Lists.newArrayList(requestedSchema.columns());
for (int fieldId : missingIds) {
if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
continue; // add _pos at the end
}
Types.NestedField field = tableSchema.asStruct().field(fieldId);
Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);
columns.add(field);
}
if (missingIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
columns.add(MetadataColumns.ROW_POSITION);
}
return new Schema(columns);
}
复制代码
CloseableIterable<RowData> iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant));
根据deletes和task生成迭代器
newIterable(task, deletes.requiredSchema(), idToConstant)
先生成一个data文件数据读取的迭代器,然后在调用deletes.filter()
过滤掉其他delete数据,filter中会先使用applyPosDeletes,再使用applyEqDeletes,applyPosDeletes之前在spark读取中分析过,会先把posDelete文件读取到set中,然后放入到一个堆中,生成一个迭代器,然后归并迭代过滤掉删除的pos的数据,下面看下主要看下applyEqDeletes的逻辑
public CloseableIterable<T> filter(CloseableIterable<T> records) {
return applyEqDeletes(applyPosDeletes(records));
}
复制代码
private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
// Predicate to test whether a row should be visible to user after applying equality deletions.
Predicate<T> remainingRows = applyEqDeletes().stream()
.map(Predicate::negate)
.reduce(Predicate::and)
.orElse(t -> true);
Filter<T> remainingRowsFilter = new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return remainingRows.test(item);
}
};
return remainingRowsFilter.filter(records);
}
复制代码
这里会先创建一个Predicate对象–过滤器remainingRows,然后调用其test方法进行过滤。
首先调用applyEqDeletes()方法
private List<Predicate<T>> applyEqDeletes() {
List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
if (eqDeletes.isEmpty()) {
return isInDeleteSets;
}
Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
for (DeleteFile delete : eqDeletes) {
filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
}
for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
Set<Integer> ids = entry.getKey();
Iterable<DeleteFile> deletes = entry.getValue();
Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
// a projection to select and reorder fields of the file schema to match the delete rows
StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
delete -> openDeletes(delete, deleteSchema));
StructLikeSet deleteSet = Deletes.toEqualitySet(
// copy the delete records because they will be held in a set
CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
deleteSchema.asStruct());
Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
isInDeleteSets.add(isInDeleteSet);
}
return isInDeleteSets;
}
复制代码
这里先将delete文件按equalityFieldIds为key分类放入map中,在每个map的entry中调用Deletes.toEqualitySet生成一个StructLikeSet对象,然后构造过滤器返回,这里应该是读取和小文件并的主要瓶颈所在,构造需要读取EquilityDeleteFiles然后构造成StructLikeSet,首先从hdfs中读取EquilityDeleteFiles需要花费大量的时间,目前delete文件是写整条数据,但其实需要使用的只是equalityFieldIds,这里写的时候就浪费了io耗时和存储空间,读取的时候根据测试结果来看,如果整条记录较长的话构造StructLikeSet花费的时间也会急剧增加,而这里使用的是parquet列式存储文件进行存储,应该是有列裁剪 Column Pruning 和 映射下推 Project PushDown的优化功能的,但是更具测试感觉没有体现到,然后是构造StructLikeSet其中倒是只存储了equalityFieldIds列,但是依旧花费了大量的内存且数据量越大查询效率越低,这里也是可以考虑优化。