hadoop 小文件优化、压缩

1:项目背景

hadoop的小文件管理是一个比较头疼的事情。项目最开始的时候大数据namenode与datanode混布,出现过一次namenode内存不够用,集群所有任务失败。为此单独起一个项目治理小文件,并且迁移datanode,namenode 单独部署。

2:HDFS简介

HDFS是Hadoop核心组成, 是分布式存储服务。由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。基本都是一个NameNode+多个DataNode组成。NameNode是集群的主节点, DataNode是集群的从节点。  Namenode 负责元数据管理,维护文件和目录树,响应Client请求;Datanode负责实际数据存储。

hdfs 架构.jpg

3:HDFS Block 简介

Block是文件块,HDFS中是以Block为单位进行文件的管理的。一个文件可能有多个块,每个块默认是3个副本,这些块分别存储在不同机器上。块与文件之前的映射关系会定时上报Namenode。HDFS中一个块的默认大小是64M,其大小由参数dfs.block.size控制。

4:小文件如何产生以及影响

1:动态分区插入数据,产生大量的小文件,从而导致 map 数量剧增。
2:reduce 数量越多,小文件也越多,reduce 的个数和输出文件个数一致。
3:数据源本身就是大量的小文件。

在运行时,HDFS中每个文件、目录和数据块的元数据信息(大约150字节)必须存储在NameNode的内存中。所以说小文件问题有两个主要原因:NameNode 内存管理和 MapReduce 性能。

5:治理小文件常用方法

1:合并文件并上传到hdfs
hadoop dfs -cat /user/hive/warehouse/ods.db/xxxxx/staticdate=2019-06-05/ | hadoop dfs -put - /user/hive/warehouse/ods.db/xxxxx/staticdate=2019-06-05/0000.merge
复制代码
2:hive表使用orc格式进行存储,可以通过concatenate命令。

1:通过beeline 命令行压缩

beeline  -u jdbc:hive2://namenode:10000  --showHeader=false --outputformat=tsv2 --silent=true -e "show partitions tableName" > tableName.txt
 
##注意执行前要设置参数,详细如下:
//每个Map最小输入大小,决定合并后的文件数
set mapreduce.input.fileinputformat.split.maxsize=100000000;

//每个Map最大输入大小,决定合并后的文件数
set mapreduce.input.fileinputformat.split.minsize=100000000;

//一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapreduce.input.fileinputformat.split.minsize.per.node=100000000;

//一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapreduce.input.fileinputformat.split.minsize.per.rack=100000000;


#!/bin/bash
for line in `cat found_partitions.txt`; do
    echo "the next partition is $line"
    partition=` (echo  $line | sed -e 's/\//,/g' -e "s/=/='/g" -e "s/,/',/g" -e  "s/$/'/g" )`\'
    beeline -u jdbc:hive2://namenode:10000  -e "alter table database.table partition($partition) concatenate”
done
复制代码

2:通过定时job执行。跟上面类似,无非换成java  schedule。通过hivejdbc show partitions table。然后检测分区文件下的个数以及文件大小。确认是否需要合并,再执行合并代码。部分核心代码如下:

    public static final String TABLE = "${db.table}";

    public static final String PARTITION = "${partition}";

    public static final String CONCATENATE_SQL = "ALTER TABLE ${db.table} PARTITION (${partition}) CONCATENATE";

    public static String getConcatenateSql(String table, String partition) {
        return CONCATENATE_SQL.replace(TABLE, table).replace(PARTITION, partition);
    }
    @Resource(name = "hiveJdbcTemplate")
    private JdbcTemplate jdbcTemplate;

    public int concatenate(String table, String partition) {
        String concatenateSql = HiveSQLTemplate.getConcatenateSql(table, partition);
        beforePropertiesSet();
        return update(concatenateSql);
    }

    public List<String> showPartitions(String table) {
        return jdbcTemplate.queryForList("show partitions " + table, String.class);
    }

    private int update(String sql) {
        int updateStatus = jdbcTemplate.update(sql);
        return updateStatus;
    }

    public void beforePropertiesSet() {
        jdbcTemplate.batchUpdate("set mapreduce.job.queuename=root.default",
                "set mapreduce.input.fileinputformat.split.maxsize=100000000",
                "set mapreduce.input.fileinputformat.split.minsize=100000000",
                "set mapreduce.input.fileinputformat.split.minsize.per.node=100000000",
                "set mapreduce.input.fileinputformat.split.minsize.per.rack=100000000"
        );
    }

复制代码

3:SparkSQL 小文件管理 

insert overwrite table xx partition(dt)  select*from xx distribute by dt 
复制代码

使用 distribute by 把同一分区的记录哈希到同一个分区,由一个 SparkTask 进行 写入,这样每个分区下只会有一个小于 128M 的文件。当数据量过大的时候这种方式会 导致倾斜 key 的时间长,任务跑的慢,可以改为: distribute by dt cast(rand()*n as int) 每个分区下只产生 n 个文件,打散数据防止造成倾斜。 这种方式建议增加 reduce 端端内存,防止 oom。

6:业内其他方案

1:联邦HDFS

在Hadoop 2.x发行版中引入了联邦HDFS功能,期望可以解决NameNode的内存问题。联邦HDFS允许系统通过添加多个NameNode来实现扩展,其中每个NameNode管理文件系统命名空间中的一部分。但是系统管理员需要维护多个NameNode和负载均衡服务,这又增加了管理成本。所以HDFS的联邦方案并没有被生产环境所采用。

2:归档文件

Hadoop归档文件或HAR文件是将HDFS文件打包到归档中的工具。这是在HDFS中存储大量小文件的比较常用的选择。HAR文件的原理是将很多小文件打包到一起,形成一个HDFS文件(有点类似Linux的TAR文件),这样可以有效的减少HDFS管理的block数量,从而降低NameNode使用。

7:总结

在做大数据容量规划的时候,切记namenode一定不能与datanode混布。埋点体系的搭建也至关重要,尤其是给产品科普埋点建设,不然很容易出现做一个新功能就是一个埋点。如果再有一些其他操作,很多事后的处理有点亡羊补牢的意思。

ods 历史的数据比较多,可以考虑历史无用的埋点(dwd层已包含相关数据的)迁移到本地磁盘。从而降低集群的数据块,降低nn的压力。

如果你是cdh集群部署,可以通过api 动态监控线上数据块变化,http://namenode:9870/jmx  当达到一定的阈值,发出相关告警。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享