1:项目背景
hadoop的小文件管理是一个比较头疼的事情。项目最开始的时候大数据namenode与datanode混布,出现过一次namenode内存不够用,集群所有任务失败。为此单独起一个项目治理小文件,并且迁移datanode,namenode 单独部署。
2:HDFS简介
HDFS是Hadoop核心组成, 是分布式存储服务。由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。基本都是一个NameNode+多个DataNode组成。NameNode是集群的主节点, DataNode是集群的从节点。 Namenode 负责元数据管理,维护文件和目录树,响应Client请求;Datanode负责实际数据存储。
3:HDFS Block 简介
Block是文件块,HDFS中是以Block为单位进行文件的管理的。一个文件可能有多个块,每个块默认是3个副本,这些块分别存储在不同机器上。块与文件之前的映射关系会定时上报Namenode。HDFS中一个块的默认大小是64M,其大小由参数dfs.block.size控制。
4:小文件如何产生以及影响
1:动态分区插入数据,产生大量的小文件,从而导致 map 数量剧增。
2:reduce 数量越多,小文件也越多,reduce 的个数和输出文件个数一致。
3:数据源本身就是大量的小文件。
在运行时,HDFS中每个文件、目录和数据块的元数据信息(大约150字节)必须存储在NameNode的内存中。所以说小文件问题有两个主要原因:NameNode 内存管理和 MapReduce 性能。
5:治理小文件常用方法
1:合并文件并上传到hdfs
hadoop dfs -cat /user/hive/warehouse/ods.db/xxxxx/staticdate=2019-06-05/ | hadoop dfs -put - /user/hive/warehouse/ods.db/xxxxx/staticdate=2019-06-05/0000.merge
复制代码
2:hive表使用orc格式进行存储,可以通过concatenate命令。
1:通过beeline 命令行压缩
beeline -u jdbc:hive2://namenode:10000 --showHeader=false --outputformat=tsv2 --silent=true -e "show partitions tableName" > tableName.txt
##注意执行前要设置参数,详细如下:
//每个Map最小输入大小,决定合并后的文件数
set mapreduce.input.fileinputformat.split.maxsize=100000000;
//每个Map最大输入大小,决定合并后的文件数
set mapreduce.input.fileinputformat.split.minsize=100000000;
//一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapreduce.input.fileinputformat.split.minsize.per.node=100000000;
//一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapreduce.input.fileinputformat.split.minsize.per.rack=100000000;
#!/bin/bash
for line in `cat found_partitions.txt`; do
echo "the next partition is $line"
partition=` (echo $line | sed -e 's/\//,/g' -e "s/=/='/g" -e "s/,/',/g" -e "s/$/'/g" )`\'
beeline -u jdbc:hive2://namenode:10000 -e "alter table database.table partition($partition) concatenate”
done
复制代码
2:通过定时job执行。跟上面类似,无非换成java schedule。通过hivejdbc show partitions table。然后检测分区文件下的个数以及文件大小。确认是否需要合并,再执行合并代码。部分核心代码如下:
public static final String TABLE = "${db.table}";
public static final String PARTITION = "${partition}";
public static final String CONCATENATE_SQL = "ALTER TABLE ${db.table} PARTITION (${partition}) CONCATENATE";
public static String getConcatenateSql(String table, String partition) {
return CONCATENATE_SQL.replace(TABLE, table).replace(PARTITION, partition);
}
@Resource(name = "hiveJdbcTemplate")
private JdbcTemplate jdbcTemplate;
public int concatenate(String table, String partition) {
String concatenateSql = HiveSQLTemplate.getConcatenateSql(table, partition);
beforePropertiesSet();
return update(concatenateSql);
}
public List<String> showPartitions(String table) {
return jdbcTemplate.queryForList("show partitions " + table, String.class);
}
private int update(String sql) {
int updateStatus = jdbcTemplate.update(sql);
return updateStatus;
}
public void beforePropertiesSet() {
jdbcTemplate.batchUpdate("set mapreduce.job.queuename=root.default",
"set mapreduce.input.fileinputformat.split.maxsize=100000000",
"set mapreduce.input.fileinputformat.split.minsize=100000000",
"set mapreduce.input.fileinputformat.split.minsize.per.node=100000000",
"set mapreduce.input.fileinputformat.split.minsize.per.rack=100000000"
);
}
复制代码
3:SparkSQL 小文件管理
insert overwrite table xx partition(dt) select*from xx distribute by dt
复制代码
使用 distribute by 把同一分区的记录哈希到同一个分区,由一个 SparkTask 进行 写入,这样每个分区下只会有一个小于 128M 的文件。当数据量过大的时候这种方式会 导致倾斜 key 的时间长,任务跑的慢,可以改为: distribute by dt cast(rand()*n as int) 每个分区下只产生 n 个文件,打散数据防止造成倾斜。 这种方式建议增加 reduce 端端内存,防止 oom。
6:业内其他方案
1:联邦HDFS
在Hadoop 2.x发行版中引入了联邦HDFS功能,期望可以解决NameNode的内存问题。联邦HDFS允许系统通过添加多个NameNode来实现扩展,其中每个NameNode管理文件系统命名空间中的一部分。但是系统管理员需要维护多个NameNode和负载均衡服务,这又增加了管理成本。所以HDFS的联邦方案并没有被生产环境所采用。
2:归档文件
Hadoop归档文件或HAR文件是将HDFS文件打包到归档中的工具。这是在HDFS中存储大量小文件的比较常用的选择。HAR文件的原理是将很多小文件打包到一起,形成一个HDFS文件(有点类似Linux的TAR文件),这样可以有效的减少HDFS管理的block数量,从而降低NameNode使用。
7:总结
在做大数据容量规划的时候,切记namenode一定不能与datanode混布。埋点体系的搭建也至关重要,尤其是给产品科普埋点建设,不然很容易出现做一个新功能就是一个埋点。如果再有一些其他操作,很多事后的处理有点亡羊补牢的意思。
ods 历史的数据比较多,可以考虑历史无用的埋点(dwd层已包含相关数据的)迁移到本地磁盘。从而降低集群的数据块,降低nn的压力。
如果你是cdh集群部署,可以通过api 动态监控线上数据块变化,http://namenode:9870/jmx 当达到一定的阈值,发出相关告警。