Hadoop/Hive/Spark小文件处理

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

什么是小文件

小文件指的是文件size比HDFS的block size小很多的文件。Hadoop适合处理少量的大文件而不是大量的小文件。

hadoop小文件常规的处理方式

1、小文件导致的问题

首先在HDFS中任何block文件或者目录在内存中均以对象的形式存储每个对象约占150byte如果有1000 0000个小文件每个文件占用一个block则namenode大约需要2G空间。如果存储1亿个文件则namenode需要20G空间。这样namenode内存容量严重制约了集群的扩展。
其次访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的如果访问大量小文件需要不断的从一个datanode跳到另一个datanode严重影响性能。
最后处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

2、Hadoop自带的解决方案

对于小文件问题Hadoop本身也提供了几个解决方案分别为 Hadoop Archive Sequence
file 和 CombineFileInputFormat 。

  • Hadoop Archive
    Hadoop Archive或者HAR是一个高效地将小文件放入HDFS块中的文件存档工具它能够将多
    个小文件打包成一个HAR文件这样在减少namenode内存使用的同时仍然允许对文件进行
    透明的访问。
  • Sequence file
    sequence file由一系列的二进制key/value组成如果为key小文件名value为文件内容则可
    以将大批小文件合并成一个大文件。
  • CombineFileInputFormat
    它是一种新的inputformat用于将多个文件合并成一个单独的split另外它会考虑数据的
    存储位置。

Hadoop Archive

Hadoop Archive或者HAR是一个高效地将小文件放入HDFS块中的文件存档工具它能够将多个小文件打包成一个HAR文件这样在减少namenode内存使用的同时仍然允许对文件进行透明的访问。对某个目录/foo/bar下的所有小文件存档成/outputdir/ zoo.har

hadoop archive -archiveName zoo.har -p /foo/bar /outputdir

当然也可以指定HAR的大小(使用-Dhar.block.size)。
HAR是在Hadoop file system之上的一个文件系统因此所有fs shell命令对HAR文件均可用只不过是文件路径格式不一样HAR的访问路径可以是以下两种格式

har://scheme-hostname:port/archivepath/fileinarchive
har:///archivepath/fileinarchive(本节点)

可以这样查看HAR文件存档中的文件

hadoop dfs -ls har:///user/zoo/foo.har

输出

har:///user/zoo/foo.har/hadoop/dir1
har:///user/zoo/foo.har/hadoop/dir2

使用HAR时需要两点
第一对小文件进行存档后原文件并不会自动被删除需要用户自己删除
第二创建HAR文件的过程实际上是在运行一个mapreduce作业因而需要有一个hadoop集群运行此命令。

此外HAR还有一些缺陷
第一一旦创建Archives便不可改变。要增加或移除里面的文件必须重新创建归档文件。
第二要归档的文件名中不能有空格否则会抛出异常可以将空格用其他符号替换
(使用-Dhar.space.replacement.enable=true 和-Dhar.space.replacement参数)。
第三存档文件不支持压缩。

一个归档后的文件其存储结构如下图
在这里插入图片描述

Sequence file

Sequence file由一系列的二进制key/value组成如果为key小文件名value为文件内容则可以将大批小文件合并成一个大文件。
Hadoop-0.21.0中提供了SequenceFile包括WriterReader和SequenceFileSorter类进行写读和排序操作。
在这里插入图片描述
创建sequence file的过程可以使用mapreduce工作方式完成对于index需要改进查找算法
优缺点对小文件的存取都比较自由也不限制用户和文件的多少但是该方法不能使用append方
法所以适合一次性写入大量小文件的场景

CombineFileInputFormat

CombineFileInputFormat是一种新的inputformat用于将多个文件合并成一个单独的split另外它会考虑数据的存储位置。

Hive小文件问题怎么解决

首先我们要弄明白两个问题
1哪里会产生小文件
源数据本身有很多小文件
动态分区会产生大量小文件
reduce个数越多, 小文件越多
按分区插入数据的时候会产生大量的小文件, 文件个数 = maptask个数 * 分区数

2小文件太多造成的影响
从Hive的角度看小文件会开很多map一个map开一个JVM去执行所以这些任务的初始化启
动执行会浪费大量的资源严重影响性能。
HDFS存储太多小文件, 会导致namenode元数据特别大, 占用太多内存, 制约了集群的扩展

小文件解决方案

方法一通过调整参数进行合并

1在Map输入的时候, 把小文件合并

-- 每个Map最大输入大小决定合并后的文件数
setmapred.max.split.size=256000000;
-- 一个节点上split的至少的大小决定了多个datanode上的文件是否需要合并
setmapred.min.split.size.per.node=100000000;
-- 一个交换机下split的至少的大小决定了多个交换机上的文件是否需要合并
setmapred.min.split.size.per.rack=100000000;
-- 执行Map前进行小文件合并
sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

2在Reduce输出的时候, 把小文件合并

-- 在map-onlyjob后合并文件默认true
sethive.merge.mapfiles=true;
-- 在map-reducejob后合并文件默认false
sethive.merge.mapredfiles=true;
-- 合并后每个文件的大小默认256000000
sethive.merge.size.per.task=256000000;
-- 平均文件大小是决定是否执行合并操作的阈值默认16000000 sethive.merge.smallfiles.avgsize=100000000;

方法二针对按分区插入数据的时候产生大量的小文件的问题可以使用DISTRIBUTE BY rand() 将数据

随机分配给Reduce这样可以使得每个Reduce处理的数据大体一致。

-- 设置每个reducer处理的大小为5个G
sethive.exec.reducers.bytes.per.reducer=5120000000;
-- 使用distributebyrand()将数据随机分配给reduce,避免出现有的文件特别大,有的文件特别小insertoverwritetabletestpartition(dt)
select * from iteblog_tmp
DISTRIBUTEBYrand();

方法三使用Sequencefile作为表存储格式不要用textfile在一定程度上可以减少小文件

方法四使用hadoop的archive归档

-- 用来控制归档是否可用
set hive.archive.enabled=true;
-- 通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;-- 控制需要归档文件的大小
set har.partfile.size=1099511627776;
-- 使用以下命令进行归档
ALTER TABLE srcpart ARCHIVE PARTITION(ds='2008-04-08',hr='12');
- 对已归档的分区恢复为原文件
ALTER TABLE srcpart UNARCHIVE PARTITION(ds='2008-04-08',hr='12');
-- 注意归档的分区不能够INSERTOVERWRITE必须先unarchive

Spark输出文件的个数如何合并小文件

当使用spark sql执行etl时候出现了最终结果大小只有几百k但是小文件一个分区可能就有上千的情况。
小文件过多的一些危害如下

  • hdfs有最大文件数限制
  • 浪费磁盘资源可能存在空文件
  • hive中进行统计计算的时候会产生很多个map影响计算的速度。

解决方案如下

方法一通过spark的coalesce()方法和repartition()方法

val rdd2 = rdd1.coalesce(8, true) //true表示是否shuffle
val rdd3 = rdd1.repartition(8)

coalescecoalesce()方法的作用是返回指定一个新的指定分区的Rdd如果是生成一个窄依赖的结果那么可以不发生shuffle分区的数量发生激烈的变化计算节点不足不设置true可能会出错。
repartitioncoalesce()方法shuffle为true的情况。

方法二降低spark并行度即调节spark.sql.shuffle.partitions

比如之前设置的为100按理说应该生成的文件数为100
但是由于业务比较特殊采用的大量的union all且union all在spark中属于窄依赖不会进行shuffle所以导致最终会生成union all数量+1*100的文件数。
如有10个union all会生成1100个小文件。这样导致降低并行度为10之后执行时长大大增加且文件数依旧有110个效果不理想。

方法三新增一个并行度=1任务专门合并小文件

先将原来的任务数据写到一个临时分区如tmp再起一个并行度为1的任务类似

insert overwrite 目标表 select * from 临时分区

结果小文件数还是没有减少经过多次测后发现原因‘select * from 临时分区’ 这个任务在spark中属于窄依赖
并且spark DAG中分为宽依赖和窄依赖只有宽依赖会进行shuffle
故并行度shufflespark.sql.shuffle.partitions=1也就没有起到作用

由于数据量本身不是特别大所以直接采用了group by在spark中属于宽依赖的方式类似

insert overwrite 目标表 select * from 临时分区 group by * 

先运行原任务写到tmp分区‘dfs -count’查看文件数1100个运行加上group by的临时任务
spark.sql.shuw le.partitions=1查看结果目录文件数=1成功。

总结

1方便的话可以采用coalesce()方法和repartition()方法
2如果任务逻辑简单数据量少可以直接降低并行度
3任务逻辑复杂数据量很大原任务大并行度计算写到临时分区再加两个任务
一个用来将临时分区的文件用小并行度加宽依赖合并成少量文件到实际分区
另一个删除临时分区
4hive任务减少小文件相对比较简单可以直接设置参数如
Map-only的任务结束时合并小文件

set hive.merge.mapfiles = true

在Map-Reduce的任务结束时合并小文件

set hive.merge.mapredfiles= true

当输出文件的平均大小小于1GB时启动一个独立的map-reduce任务进行文件merge

set hive.merge.smallfiles.avgsize=1024000000
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Hadoop