HDFS

基础知识

1. HDFS读流程

image.png

  1. 客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。
  2. 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
  3. DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。
  4. 客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。
2. HDFS写流程

image.png

  1. 客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
  2. NameNode 返回是否可以上传。
  3. 客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。
  4. NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。
  5. 客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。
  6. dn1、dn2、dn3 逐级应答客户端。
  7. 客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个packet会放入一个应答队列等待应答。
  8. 当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务器。(重复执行 3-7 步)
3. HDFS文件副本放置策略
  1. 第一个副本:在DataNode数据节点提交时,第一个块是本地节点,目的是节省了本地资源,占用了更少的带宽,如果是在集群外提交,则随机挑选一台磁盘不太慢,CPU不太忙的节点上;
  2. 第二个副本:放置在与第一个副本不同的机架的节点上;
  3. 第三个副本:与第二个副本相同机架的不同节点上;
  4. 若有更多副本,随机放在节点中。
4. HDFS的各进程名称与功能
  • NameNode:维护着文件的元数据,包括文件名、副本数、文件的 BlockId,以及 block 所的服务器,会接受来自 Client 端的读写请求,和DataNode 的 Block 信息上报。
  • Secondary NameNode:用来监控HDFS状态的辅助后台程序。Secondary NameNode不同NameNode,它不接受或者记录任何实时的数据变化,但是,它会与NameNode进行通信,以便定期地保存HDFS元数据快照。由于NameNode是单点的,通过Secondary NameNode的快照功能,可以将NameNode的宕机时间和数据损失降低到最小。同时,如果NameNode中的Fsimage到达了指定容量,则会和Secondary NameNode进行通信,进行checkoint操作。
  • DataNode:HDFS 的工作节点,他们根据客户端或者是 NameNode 的调度存储和检索数据,并且定期向 NameNode 发送他们所存储的块(block)的列表。
  • JournalNode:负责两个 NameNode 高可用时的数据同步保证数据一致,存放 NameNode 的 editlog 文件(元数据),部署在任意节点,奇数个。
  • JobTracker:JobTracker后台程序用来连接应用程序与Hadoop。用户代码提交到集群以后,由JobTracker决定哪个文件将被处理,并且为 不同的task分配节点。同时,它还监控所有的task,一旦某个task失败了,JobTracker就会自动重新开启这个task,在大多数情况下这 个task会被放在不用的节点上。每个Hadoop集群只有一个JobTracker,一般运行在集群的Master节点上。
  • TaskTracker:TaskTracker与负责存储数据的DataNode相结合,其处理结构上也遵循主/从架构。JobTracker位于主节点,统领 MapReduce工作;而TaskTrackers位于从节点,独立管理各自的task。每个TaskTracker负责独立执行具体的task,而 JobTracker负责分配task。

详情请参考:www.cnblogs.com/1130136248w…

5. HDFS 存储格式
  • 面向行:

同一行的数据存储在一起,即连续存储。SequenceFile,MapFile,Avro Datafile。采用这种方式,如果只需要访问行的一小部分数据,亦需要将整行读入内存,推迟序列化一定程度上可以缓解这个问题,但是从磁盘读取整行数据的开销却无法避免。面向行的存储适合于整行数据需要同时处理的情况。

  • 面向列:

整个文件被切割为若干列数据,每一列数据一起存储。Parquet , RCFile,ORCFile。面向列的格式使得读取数据时,可以跳过不需要的列,适合于只处于行的一小部分字段的情况。但是这种格式的读写需要更多的内存空间,因为需要缓存行在内存中(为了获取多行中的某一列)。同时不适合流式写入,因为一旦写入失败,当前文件无法恢复,而面向行的数据在写入失败时可以重新同步到最后一个同步点,所以Flume采用的是面向行的存储格式。

  • textfile

  • textfile为默认格式,存储方式为行式存储,在检索时磁盘开销大 数据解析开销大,而对压缩的text文件 hive无法进行合并和拆分。

  • SequenceFile

  • Hadoop提供的一个行存储结构,Hadoop适合处理大文件而不适合处理小文件,所以sequencefile是为小文件提供的一种容器,将小文件包装起来形成一个SequenceFile类, 它用一种<key,value>的形式序列化数据导文件中。

  • RCFile

  • 存储方式为数据按行分块,每块按照列存储的行列混合模式,具有压缩快,列存取快的特点。

  • 在使用时,读记录尽量涉及到的block最少,这样读取需要的列只需要读取每个row group 的头部定义,具有明显速度优势。

  • 读取全量数据的操作 性能可能比sequencefile没有明显的优势。

  • ORCfile

  • 是RCfile的升级版,将数据划分为默认大小为250MB的stripe(条带),每个stripe包含索引,数据和footer,ORCfile包换索引比RCfile更加高效

  • Parquet

  • parquet基于Google的dremel,擅长处理深度嵌套的数据(有点类似于嵌套多层的json格式),parquet会将嵌套结构整合为平面列存储。

6. HDFS文件压缩算法
  • Gzip压缩
    • 优点:压缩率比较高,压缩/解压速度也比较快,hadoop本身支持。
    • 缺点:不支持分片。
    • 应用场景:当每个文件压缩之后在1个block块大小内,可以考虑用gzip压缩格式。
  • lzo压缩
    • 优点:压缩/解压速度也比较快,合理的压缩率,支持分片,是Hadoop中最流行的压缩格式,支持Hadoop native库。
    • 缺点:压缩率比gzip要低一些,Hadoop本身不支持,需要安装,如果支持分片需要建立索引,还需要指定inputformat改为lzo格式。
    • 应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越明显。
  • snappy压缩
    • 优点:支持Hadoop native库,高速压缩速度和合理的压缩率。
    • 缺点:不支持分片,压缩率比gzip要低,Hadoop本身不支持,需要安装。
    • 应用场景:当MapReduce作业的map输出的数据比较大的时候,作为map到reduce的中间数据的压缩格式。
  • bzip2压缩
    • 优点:支持分片,具有很高的压缩率,比gzip压缩率都高,Hadoop本身支持,但不支持native。
    • 缺点:压缩/解压速度慢,不支持Hadoop native库。
    • 应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式,输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况。

故障排查与调优

1. HDFS中小文件过多导致的问题与如何优化

1. 使用HAR(Hadoop Archives)

为了缓解大量小文件带给namenode内存的压力,Hadoop 0.18.0引入了Hadoop Archives(HAR files),其本质就是在HDFS之上构建一个分层文件系统。通过执行hadoop archive 命令就可以创建一个HAR文件。在命令行下,用户可使用一个以har://开头的URL就可以访问HAR文件中的小文件。使用HAR files可以减少HDFS中的文件数量。

下图为HAR文件的文件结构,可以看出来访问一个指定的小文件需要访问两层索引文件才能获取小文件在HAR文件中的存储位置,因此,访问一个HAR文件的效率可能会比直接访问HDFS文件要低。对于一个mapreduce任务来说,如果使用HAR文件作为其输入,仍旧是其中每个小文件对应一个map task,效率低下。所以,HAR files最好是用于文件归档
image.png

2. 使用sequencefile

SequenceFile核心是以文件名为key,文件内容为value组织小文件。10000个100KB的小文件,可以编写程序将这些文件放到一个SequenceFile文件,然后就以数据流的方式处理这些文件,也可以使用MapReduce进行处理。一个SequenceFile是可分割的,所以MapReduce可将文件切分成块,每一块独立操作。不像HAR,SequenceFile支持压缩。在大多数情况下,以block为单位进行压缩是最好的选择,因为一个block包含多条记录,压缩作用在block之上,比reduce压缩方式(一条一条记录进行压缩)的压缩比高。

把已有的数据转存为SequenceFile比较慢。比起先写小文件,再将小文件写入SequenceFile,一个更好的选择是直接将数据写入一个SequenceFile文件,省去小文件作为中间媒介。

3. MapReduce过程中使用CombineFileInputFormat

CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。

2. MapReduce跑得慢的原因

Mapreduce 程序效率的瓶颈在于两点:

1. 计算机性能

CPU、内存、磁盘健康、网络
复制代码

2. I/O 操作优化

数据倾斜
map和reduce数设置不合理
reduce等待过久
小文件过多
大量的不可分块的超大文件
spill次数过多
merge次数过多等。
复制代码

mapreduce优化方案

  1. 数据输入:

    合并小文件:在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大map任务装载次数,而任务的装载比较耗时,从而导致 mr 运行较慢。
    采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景。

  2. map阶段

    减少spill次数:通过调整io.sort.mb及sort.spill.percent参数值,增大触发spill的内存上限,减少spill次数,从而减少磁盘 IO。
    减少merge次数:通过调整io.sort.factor参数,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。
    在 map 之后先进行combine处理,减少 I/O。

  3. reduce阶段

    合理设置map和reduce数:两个都不能设置太少,也不能设置太多。太少,会导致task等待,延长处理时间;太多,会导致 map、reduce任务间竞争资源,造成处理超时等错误。
    设置map、reduce共存:调整slowstart.completedmaps参数,使map运行到一定程度后,reduce也开始运行,减少reduce的等待时间。
    规避使用reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
    合理设置reduc端的buffer,默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用。这样一来,设置buffer需要内存,读取数据需要内存,reduce计算也要内存,所以要根据作业的运行情况进行调整。

  4. IO传输

    采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。
    使用SequenceFile二进制文件

数据倾斜问题

  1. 数据倾斜现象

    数据频率倾斜——某一个区域的数据量要远远大于其他区域。
    数据大小倾斜——部分记录的大小远远大于平均值。

  2. 如何收集倾斜数据

在reduce方法中加入记录map输出键的详细情况的功能。

public static final String MAX_VALUES = "skew.maxvalues";
private int maxValueThreshold;
 
@Override
public void configure(JobConf job) {
     maxValueThreshold = job.getInt(MAX_VALUES, 100);
}

@Override
public void reduce(Text key, Iterator<Text> values,
                     OutputCollector<Text, Text> output,
                     Reporter reporter) throws IOException {
     int i = 0;
     while (values.hasNext()) {
         values.next();
         i++;
     }
     if (++i > maxValueThreshold) {
         log.info("Received " + i + " values for key " + key);
     }
}

复制代码
  1. 减少数据倾斜的方法

    a:抽样和范围分区

可以通过对原始数据进行抽样得到的结果集来预设分区边界值。

b:自定义分区
复制代码

另一个抽样和范围分区的替代方案是基于输出键的背景知识进行自定义分区。例如,如果map输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。

c:Combine
复制代码

使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。在可能的情况下,combine的目的就是聚合并精简数据。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享