【大数据】Hadoop总结
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
本文对于Hadoop中的HDFS和MapReduce的相关面试重点进行了总结下篇将介绍调优、数据倾斜等进阶知识。
Hadoop总结
一、概述
1. Hadoop特性
1高可靠性
采用冗余数据存储方式即使一个副本发生故障其他副本也可以保证正常对外提供服务。
2高效性
作为并行分布式计算平台Hadoop采用分布式存储和分布式处理两大核心技术能够高效地处理PB级别数据。
3高容错性
采用冗余数据存储方式自动保存数据的多个副本并且能够自动将失败的任务重新进行分配。
4高可扩展性
Hadoop的设计目标是可以高效稳定地运行在廉价的计算机集群上可以扩展到数以千计的计算机节点上。
5成本低
Hadoop采用廉价的计算机集群成本比较低普通用户也很容易用自己的PC搭建Hadoop运行环境。
6运行在Linux平台上
Hadoop是基于Java语言开发的可以较好地运行在Linux平台上。
7支持多种编程语言
Hadoop上的应用程序也可以使用其他语言编写如C++。
2. HDFS结构
- Hadoop主要由HDFS架构分布式文件系统和MapReduce架构并行计算框架组成。
HDFS 架构
1主从结构
- 主节点只有一个: namenode
- 从节点有很多个: datanodes
2namenode负责
- 接收用户操作请求
- 维护文件系统的目录结构
- 管理文件与block之间关系
3datanode负责
- 存储文件文件被分成block存储在磁盘上每个块128M为保证数据安全文件会有多个副本
4在Hadoop中一个文件被划分成大小固定的多个文件块分布的存储在集群中的节点中。如下图
同一个文件块在不同的节点中有多个副本。如下图
我们需要一个集中的地方Namenode保存文件的分块信息。
二、HDFS分布式文件系统
1 概述
- Hadoop Distributed File System -Hadoop分布式文件存储系统
- HDFS为了保证数据存储的可靠性副本和读取性能切块对数据进行切块后进行复制保证副本的数量并存储在集群的多个节点中。
- HDFS中存在一个名字节点NameNode和多个数据节点DataNode
2. HDFS存储数据架构图
NameNode
1.接收用户操作请求
2. 存储元数据信息元数据保存在内存为了保证读写效率以及磁盘用于崩溃恢复中
3.保存文件、block、datanode之间的映射关系
DataNode
1.存储block内容
2.存储在磁盘中
3.保存block id与文件的映射关系
3 HDFS优点
1.适合大数据处理
处理数据达到GB、TB甚至PB级别能够处理百万规模以上的文件数量数量相当之大。
2.检测和快速应对硬件故障
基于心跳机制
3.流式数据访问
HDFS的数据处理规模比较大应用一次需要访问大量的数据同时这些应用一般都是批量处理而不是用户交互式处理。应用程序能以流的形式访问数据集。主要的是数据的吞吐量而不是访问速度。
4.简化的一致性模型
大部分HDFS操作文件时需要一次写入多次读取。在HDFS中一个文件一旦经过创建、写入、关闭后一般就不需要修改了。这样简单的一致性模型有助于提高吞吐量。
5.高容错性
数据自动保存多个副本副本丢失后自动恢复。
6.可构建在廉价机器上
构建在廉价机器上可以轻松的通过扩展机器数量来近乎线性的提高集群存储能力。
4 HDFS缺点不适用HDFS的场景
1.不适合低延迟数据访问
由于Hadoop针对海量数据的吞吐量做了优化牺牲了获取数据的延迟所以对于低延迟来说不适合用Hadoop来做。如和用户进行交互的应用需要数据在毫秒或秒的范围内得到响应等。
2.不适合小文件存储
HDFS支持超大的文件是通过数据分布在DataNode数据的元数据保存在NameNode上。NameNode的内存大小决定了HDFS文件系统可保存的文件数量。虽然现在的系统内存都比较大但大量的小文件还是会影响NameNode的性能。且小文件存储的寻道时间会超过读取时间它违反了HDFS的设计目标。
3.多用户写入文件、修改文件
hdfs的文件只能有一次写入不支持修改和追加写入2.0版本以后支持追加。
4.不支持超强的事务
没有像关系型数据库那样对事务有强有力的支持。
5 HDFS技术细节
hdfs fsck path
查看块信息
Block
-
数据块(Block)是HDFS中存储文件的最基本的存储单位。
-
在HDFS中存储的超大数据的文件以一个标准分成几块分别存储到不同的磁盘上这个标准就称为block。block的默认大小为64M1.0版本2.0版本为128M。
-
对于文件内容而言一个文件的长度大小是size那么从文件的偏移开始按照固定的大小顺序对文件进行划分并编号划分好的每一个块称一个Block。HDFS默认Block大小是128MB以一个256MB文件共有256/128=2个Block.
-
不同于普通文件系统的是HDFS中如果一个文件30M小于一个数据块的大小128M并不占用整个数据块存储空间实际还是30M。
NameNode
- NameNode维护元数据信息
NameNode中维护着HDFS中的元数据信息包括文件和Block之间关系的信息、Block数量信息、Block和DataNode之间的关系信息数据格式参照如下
例如 /test/a.log,3,{b1,b2},[{b1:[h0,h1,h3]},{b2:[h0,h2,h4]}]
FileName文件名replicas副本数block-ids块idid-hostid所在主机
- NameNode磁盘中的文件
NameNode中的元数据信息存储在内存以及文件中内存中为实时信息文件中为数据镜像作为持久化存储使用。文件包括
- fsimage元数据镜像文件。存储某NameNode元数据信息并不是实时同步内存中的数据。
- edits操作日志文件记录了NameNode所要执行的操作。
- fstime保存最近一次checkpoint的时间。上一次更新时间
- NameNode更新数据细节
合并流程如下图所示
- 每隔一段时间Secondary NameNode会和NameNode通信请求其停止使用Editlog文件暂时将新到达的写操作添加到一个新的文件edtis.new中。
- Secondary NameNode把NameNode中的FsImage文件和EditLog文件拉回到本地加载到内存中对二者执行合并操作合并为一个新的fsimage即在内存中逐条执行EditLog中的操作使得FsImage保持最新。合并结束后Secondary NameNode会把合并后最新的FsImage文件发送到NameNode。NameNode收到后会用最新的FsImage文件去替换旧的FsImage文件同时NameNode将edtis.new改为edits从未减小EditLog文件的大小。
DataNode
- 在hadoop中数据是存放在DataNode上面的以Block的形式存储。
- DataNode节点会不断向NameNode节点发送心跳报告。状态信息和数据信息
6 HDFS 副本放置策略
- 默认副本放置策略
在默认情况下副本数量是3个所有的DN都是在同一个机架下此时写block时三个DN机器的选择是完全随机的。
- 配置机架感知后的副本放置策略
配置机架感知后HDFS在选择三个DN时副本放置策略如下
1把第一副本放在和客户端同一个节点上如果客户端不在集群中那么就会随即选一个节点存放。
2第二个副本会在和第一个副本不同的机架上随机选一个。
3第三个副本会在第二个副本相同的机架上随机选一个不同的节点。
7 何为机架感知
- 定义在分布式系统中能够识别不同的机架rack和它们之间的拓扑关系从而更加有效地管理数据和资源。
- hadoop能在系统内部建立一套服务器和机架的位置拓扑图并且能识别系统节点的拓扑位置。告诉 Hadoop 集群中哪台机器属于哪个机架。
- 为什么要设置机架感知
1开启机架感知NN可以知道DN所处的网络位置
2根据网络拓扑图可以计算出rackid通过rackid信息可以计算出任意两台DN之间的距离
3在HDFS写入block时会根据距离调整副本放置策略
4写入策略会将副本写入到不同的机架上防止某一机架挂掉副本丢失的情况。同时可以降低在读取时候的网络I/O。但是会增加写操作的成本。
8 namenode 被格式化之后产生了哪些文件edits,fsimage,seen_txid,VERSION各自的功能是什么
- 格式化文件存放路径
/usr/local/hadoop2.7.1/data/hdfs/name/current
- 格式化文件
edits 、fsimage、seen_txid、VERSION
(1) fsimage文件:HDFS文件系统元数据的一个永久性检查点,包含HDFS文件系统的所有目录和文件idnode的序列化信息。
(2)edits文件:存放HDFS文件系统的所有更新操作的路径,文件系统客户端所有的写操作首先会被记录到edits文件中。
(3)seen_txid文件保存的是一个数字,即最后一个edits的数字。每次NameNode启动时都会将fsimage文件读入内存,并从0001开始到seen_txid中记录的数字,依次执行每个edits里面更新操作,保证内存中的元数据信息是最新的,同步的,可以看成namenode启动时,将fsimage和edits文件进行合并。
9 Secondary NameNode的功能
1首先可以完成EditLog与FsImage的合并操作减小EditLog文件大小缩短名称节点重启时间。
- 每隔一段时间Secondary NameNode会和NameNode通信请求其停止使用Editlog文件暂时将新到达的写操作添加到一个新的文件edtis.new中。
- Secondary NameNode把NameNode中的FsImage文件和EditLog文件拉回到本地加载到内存中对二者执行合并操作合并为一个新的fsimage即在内存中逐条执行EditLog中的操作使得FsImage保持最新。合并结束后Secondary NameNode会把合并后最新的FsImage文件发送到NameNode。NameNode收到后会用最新的FsImage文件去替换旧的FsImage文件同时NameNode将edtis.new改为edits从未减小EditLog文件的大小。
2可以作为NameNode的“检查点”保存NameNode中的元数据信息。
从上面的合并过程可以看出Secondary NameNode会定期和NameNode通信从NameNode获取FsImage文件和Edits文件执行合并操作得到最新的FsImage文件。从这个角度来讲Secondary NameNode相当于为NameNode设置了一个检查点周期性地备份NameNode中的元数据信息当NameNode发生故障就可以用Secondary NameNode中记录的元数据信息进行系统恢复。
在HDFS设计中并不支持把系统直接切换到Secondary NameNode因此从这个角度来讲Secondary NameNode只是起到了NameNode的“检查点”作用并不起到热备份的作用。
10 HDFS执行流程重要
HDFS读流程
1客户端Client向远程的NameNode发起RPC读请求
2NameNode返回有该block的DataNode地址客户端Client会选取离客户端最接近的DataNode来读取block如果客户端本身就是DataNode那么将从本地直接获取数据
3客户端调用read函数开始读取数据。数据从该数据节点读到客户端当该数据块读取完毕时关闭和该数据节点的连接。
4输入流查找下一个数据块找到该数据块的最佳位置节点读取数据。
6当客户端读取完毕数据的时候关闭输入流。
HDFS写流程
1、客户端与namenode通信请求上传文件namenode检查目标文件是否已存在父目录是否存在
2、namenode返回是否可以上传
3、client请求第一个 block该传输到哪些datanode服务器上
4、namenode返回3个datanode服务器ABC
5、client请求3台dn中的一台A上传数据本质上是一个RPC调用建立pipelineA收到请求会继续调用B然后B调用C将pipeline建立完成逐级返回客户端
6、client开始往A上传第一个block先从磁盘读取数据放到一个本地内存缓存以packet为单位A收到一个packet就会传给BB传给CA每传一个packet会放入一个应答队列等待应答
7、当一个block传输完成之后client再次请求namenode上传第二个block的服务器。
8、当所有数据块都写入成功后客户端向NameNode发送完成写入的请求NameNode则更新元数据信息标记该文件已经可用。
HDFS删除流程
1先在NameNode上执行节点名字的删除。
2当NameNode执行delete方法它只标记操作涉及的要被删除的数据块而不会主动联系这些数据块所在的DataNode节点。
3当保存着这些数据块的DataNode节点向NameNode节点发送心跳时在心跳应答里NameNode节点会向DataNode发出指令从而把数据删除掉。
4所以在执行完delete方法后的一段时间数据块才能被真正的删除掉。
- 注意在读写过程中NameNode只负责地址的记录和查询所有的数据的读写都是客户端和DataNode直接联系这种形式的好处在于能够提高NameNode的应答速度同时提供HDFS的线程并发的能力。
11 何为RPCRPC的调用过程
(1) 何为RPC
-
RPC——远程过程调用协议
它是一种通过网络从远程计算机程序上请求服务而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在如TCP或UDP为通信程序之间携带信息数据。在OSI网络通信模型中RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
-
hadoop的整个体系结构构建在RPC之上(org.apache.hadoop.ipc)
(2) RPC的调用过程
先启动服务端再运行客户端可以分析查看服务端和客户端的输出信息。
- 定义RPC协议(创建接口类)
RPC协议是客户端和服务器端之间的通信接口它定义了服务器端对外的服务接口。 - 实现RPC协议(创建服务类)
Hadoop RPC 协议通常是一个java接口用户需要实现该接口。 - 构造并启动RPC Server(创建服务器类)
RPC协议定义完毕可以介于这个协议创建RPC服务器端了可以直接使用new RPC.Bulder(conf),来构造一个RPC Server并调用函数start()启动该服务。 - 构造RPC Client并发送RPC请求(创建客户端类)
使用静态方法getProxy()构造Client代理对象直接通过代理对象调用远程端口的方法。
12 何为安全模式
在分布式文件系统启动的时候开始的时候会有安全模式当分布式文件系统处于安全模式的情况下文件系统中的内容不允许修改也不允许删除直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示只需要等待一会儿即可。
- 安全模式时间段:系统刚刚启动datanode的数据节点位置信息要上报给namenode这时候还不能写、重命名和删除。这个时间段处于安全模式下。
- NameNode在启动的时候首先进入安全模式如果datanode丢失的block达到一定的比例1- dfs.safemode.threshold.pct则系统会一直处于安全模式状态即只读状态。
三、MapReduce和Yarn基本介绍
1. MapReduce概述
MapReduce基于Google发布的MapReduce论文设计开发基于分而治之的思想用于大规模数据集(大于1TB) 的并行计算和离线计算具有如下特点:
- 高度抽象的编程思想:程序员仅需描述做什么具体怎么做交由系统的执行框架处理。
- 良好的扩展性:可通过添加节点以扩展集群能力。
- 高容错性:通过计算迁移或数据迁移等策略提高集群的可用性与容错性。
2. 资源调度与分配—Yarn的引入
在Hadoop1.0版本中只有HDFS和MapReduce, 而资源调度通过MRv1来进行在2.0中资源调度由Yarn负责而MapReduce负责并行计算任务。
Apache Hadoop YARN (Yet Another Resource Negotiator), 中文名为“另一 种资源协调者"。它是一种新的Hadoop资源管理器它是一个通用资源管理系统可为上层应用提供统一的资源管理和调度它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。
引入了Yarn之后它可以支持了多种计算模式比如说离线计算、实时计算以及迭代计算。
因此我们可以这样理解HDFS可以理解为分布式硬盘-分布式文件管理系统YANR可以理解为分布式的操作系统-分布式资源cpu, memory, disk, network管理和分布式的进程调度MapReduce是运行在分布式操作系统YARN上的应用程序。
3. Yarn基本架构
主从结构
- 主节点只有一个: ResourceManager
- 从节点有很多个: NodeManager
YARN总体上仍然是master/slave结构在整个资源管理框架中resourcemanager为masternodemanager是slave。
Resourcemanager负责对各个nademanger上资源进行统一管理和调度。当用户提交一个应用程序时需要提供一个用以跟踪和管理这个程序的ApplicationMaster它负责向ResourceManager申请资源并要求NodeManger启动可以占用一定资源的任务。由于不同的ApplicationMaster被分布到不同的节点上因此它们之间不会相互影响。
(1) ResourceManager
整个集群只有一个负责集群资源的统一管理和调度
(2) NodeManager
整个集群有多个负责单节点资源管理和使用
(3) ApplicationMaster
每个应用有一个负责应用程序的管理
(4) Container
对任务运行环境的抽象(资源分配的基本单位)描述一系列信息
MapReduce on Yarn工作机制
MapReduce2.0运行在YARN之上。YARN由ResourceManagerRM 和NodeManagerNM两大块组成
1Job(Application)提交
第0步MR Client端调用job.waitForCompletion方法向整个集群提交MapReduce作业。
第1步Client向RM申请一个Application id。
第2步RM给Client返回该job资源的提交路径和Application id。
第3步Client提交jar包、切片信息和配置文件到指定的资源提交路径HDFS。
第4步Client提交完资源后向RM申请运行MrAppMaster适配YARN的ApplicationMaster。
2Job(Application)初始化
第5步当RM收到Client的请求后将该job添加到容量调度器中任务队列。
第6步某一个空闲的NM领取到该job。
第7步该NM创建Container资源抽象并通过命令启动运行MrAppMaster。
第8步下载Client提交的资源到本地。
3资源分配
第9步MrAppMaster向RM申请运行多个maptask任务资源。
第10步RM将运行maptask任务分配给另外两个NodeManager另两个NodeManager分别领取任务并创建容器。
4资源调度
第11步MR向两个接收到任务的NodeManager发送程序启动脚本这两个NodeManager分别启动maptaskmaptask对数据分区排序。
第12步MrAppMaster等待所有maptask运行完毕后向RM申请容器运行reduce task。
第13步reduce task向maptask获取相应分区的数据。
第14步程序运行完毕后MR会向RM申请注销自己。
5进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
6作业完成
除了向应用管理器请求作业进度外, 客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
4. Yarn的工作流程
步骤1 用户向YARN中提交应用程序其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
步骤2 ResourceManager为该应用程序分配第一个Container这里可以理解为一种资源比如内存并与对应的Node-Manager通信要求它在这个Container中启动应用程序的ApplicationMaster。
步骤3 ApplicationMaster首先向ResourceManager注册这样用户可以直接通过ResourceManage查看应用程序的运行状态然后它将为各个任务申请资源并监控它的运行状态直到运行结束即重复步骤4~7。
步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5 一旦ApplicationMaster申请到资源后便与对应的NodeManager通信要求它启动任务。
步骤6 NodeManager为任务设置好运行环境包括环境变量、JAR包、二进制程序等后将任务启动命令写到一个脚本中并通过运行该脚本启动任务。
步骤7 各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度以让ApplicationMaster随时掌握各个任务的运行状态从而可以在任务失败时重新启动任务。
在应用程序运行过程中用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
步骤8 应用程序运行完成后ApplicationMaster向ResourceManager注销并关闭自己。
5. MapReduce详细执行流程
mapreduce计算框架一般分为三个阶段map、shuffle和reduce
- 在map阶段将输入文件划分为split,一般map进程的数量为split分片的数量然后map将之解析为key,value做自己的逻辑处理转换输出新的key,value。
- map结果输出之后将key,value,计算出的partition写入内存环形缓冲区中当达到一定阈值后将之锁定并启动溢写线程在写入磁盘之前根据快速排序使得partition有序partition内部key有序如果有combiner就合并。没有就将该内容写入本地磁盘。
- 写入时不影响map写入缓存写入结束会产生很多溢写文件我们需要对这些文件进行merge利用归并排序将多个文件合并为一个文件。
- 此时map shuffle结束进行reduce shuffle,一个reduce从不同map拉取同一个分区的数据然后进行reduce端的merge操作这个和map merge很像如果内存资源足够就放内存然后merge如果不够就放磁盘merge。这里也会排序将相同序号的分区进行合并排序。
- 将结果输入到reduce端输入形式<key, {value list}>写逻辑输出key,value。
- 将reduce输出保存到文件中。
内存环形缓冲区作用批量收集map结果减少磁盘IO的影响。
以下是涉及到的排序算法
5.1 快速排序
当数据量较小时当前为小于13采用插入排序否则采用快速排序当快速排序递归到一定深度时采用堆排序。这种算法也称为introsort避免了快速排序最坏情况的发生。
5.2 归并排序
基本思想对于给定的一组集合利用递归与分治技术将数据序列划分成为越来越小的子集合再对子集合排序最后再用递归方法将排好序的子集合合并成为越来越大的有序序列。
经过第一轮比较后得到最小的记录然后将该记录的位置与第一个记录的位置交换接着对不包括第一个记录以外的其他记录进行第二次比较得到最小记录并与第二个位置记录交换重复该过程知道进行比较的记录只剩下一个为止。
5.3 MapReduce过程中的几次排序
在MapReduce的shuffle过程中通常会执行三次排序分别是
- Map的溢写阶段根据分区以及key进行快速排序
- Map的合并溢写文件将同一个分区的多个溢写文件进行归并排序合成大的溢写文件
- Reduce输入阶段将同一分区来自不同Map task的数据文件进行归并排序
此外在MapReduce整个过程中默认是会对输出的KV对按照key进行排序的而且是使用快速排序。
Map输出的排序其实也就是上面的溢写过程中的排序。
Reduce输出的排序即Reduce处理完数据后MapReduce内部会自动对输出的KV按照key进行排序。
6 hadoop的shuffle过程详解
6.1 Map端的shuffle
Map端会处理输入数据并产生中间结果这个中间结果会写到本地磁盘而不是HDFS。每个Map的输出会先写到内存缓冲区中当写入的数据达到设定的阈值时系统将会启动一个线程将缓冲区的数据写到磁盘这个过程叫做spill。 在spill写入之前会先进行二次排序首先根据数据所属的partition进行排序然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去以期望能够达到负载均衡以后的Reducer就会根据partition来读取自己对应的数据。接着运行combiner(如果设置了的话)combiner的本质也是一个Reducer其目的是对将要写入到磁盘上的文件先进行一次处理这样写入到磁盘的数据量就会减少。最后将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中Map任务结束后就会被删除)。
最后每个Map任务可能产生多个spill文件在每个Map任务完成前会通过多路归并算法将这些spill文件归并成一个文件。至此Map的shuffle过程就结束了。
6.2 Reduce端的shuffle
Reduce端的shuffle主要包括三个阶段copy、sort(merge)和reduce。 首先要将Map端产生的输出文件拷贝到Reduce端但每个Reducer如何知道自己应该处理哪些数据呢因为Map端进行partition的时候实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer)所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。每个Reducer会处理一个或者多个partition但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来。 接下来就是sort阶段也成为merge阶段因为这个阶段的主要工作是执行了归并排序。从Map端拷贝到Reduce端的数据都是有序的所以很适合归并排序。最终在Reduce端生成一个较大的文件作为Reduce的输入。
最后就是Reduce过程了在这个过程中产生了最终的输出结果并将其写到HDFS上。
6.3 shuffle 总结
(1) 分区partitioner按照key的不同将数据分区处理以便将来reduce取用(此步在缓冲区中进行)。分区时可以自定义的如果不自定义的话默认所有数据都在一个分区。
(2) 排序sort此步透明按照key值进行排序此步在缓冲区中进行。
(3) 合并combine一次reduce的预演减少将来reduce的工作量此步可做可不做注意特殊场景。
(4) copy此步透明数据的拉取不同的reduce按照partition的分区拉取不同的数据。
(5) 归并merge此步透明因为拉取的是文件需要将不同节点的数据合并写入文件。
6.4 小知识点
Split分片默认大小分片调用的方法?
1.split的大小是多少
切分成多少个Split那么就有多少个Map任务执行一个Map任务只处理一个Split(默认)。
一个数据分片就是一个块默认大小128M
2.分片调用的方法
FileInputFormat类getsplits方法
四、Hadoop补充知识
1. Hadoop序列化作用要实现的接口key必须要实现的接口
序列化在分布式环境的两大作用
1进程间通信
2永久存储。
3为了更高效地处理数据。当处理大量数据时序列化可以减少数据传输的大小并且可以更快地将数据从磁盘加载到内存中。
要实现的接口Writable
Key必须要实现的接口WritableComparable
2. 为什么Key必须要实现WritableComparable接口呢原因有以下几点
a. 序列化和反序列化Hadoop需要将Key进行序列化和反序列化以便在MapReduce任务之间进行传递和存储。因此Key必须实现Writable接口。
b. 排序在MapReduce中Map阶段产生的Key需要按照某种方式进行排序才能保证Reduce阶段得到的数据是按照预期的方式分组的。为了实现排序Key必须实现Comparable接口。
c. 分组在MapReduce中Reduce阶段的数据是按照Map阶段产生的Key进行分组的。因此如果需要自定义分组方式Key也必须实现Comparable接口。
综上所述为了实现在Hadoop中高效地排序和分组以及在MapReduce任务之间进行传递和存储Key必须实现WritableComparable接口。
3. 手写WordCount
/*
* LongWritable对应输入的key类型默认是行的偏移量LongWritable
* Text,对应上输入的value类型默认行数据Text
* Text:对应输出的key类型不能使用默认值需要根据需求更改
* Text:对应输出的value类型根据需求修改
* @author lesie
* 要求输出的格式(key,1)
* 单词计数输出的key类型为Text
* 输出的value类型为IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/*
* KEYIN
* VALUEIN
* context--环境对象输出结果
* @see org.apach.hadoop.mapreduce.Mapper#map(KEYIN,VALUEIN,...)
*/
public void map(LongWritable ikey,Text ivalue,Context context) throws IOException, InterruptedException
{
//获取一行数据
String line=ivalue.toString();
//按空格切片
String []arrs=line.split(" ");
for(String arr:arrs)
{
context.write(new Text(arr),new IntWritable(1));
}
}
}
/*
* reducer的数输入key用公式mapper输出的key类型
* valuein:reducer的输入value应该是mapper输出的value类型
* keyout:根据业务而定
* valueout:根据业务而定
* @author lesie
* 工作机制:
* 1.将key相同的value进行合并形成一个Iterable交给程序
* eg:(hello,<1,1,1,1,1,1>)
* 2.reduce方法执行的次数取决于mapper输出的key有多个不同的key执行多少次
* 3.默认的排序对key进行排序先按照数字进行排再按照字典顺序
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// process values
//定义计数变量
int sum=0;
//进行累加操作
for (IntWritable val : values) {
//通过get方法取出其中的值
sum+=val.get();
}
//输出数据,最终结果,key是单词Text,value是单词出现的总次数
context.write(_key, new IntWritable(sum));
}
}
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
//获取当前配置
Configuration conf=new Configuration();
//获取一个表示当前Mapreduce作业的Job对象向ahdoop申请一个job任务执行逻辑
Job job=Job.getInstance();
//指定程序入口
job.setJarByClass(WordCountDriver.class);
//设置需要执行的Mapper类
job.setMapperClass(WordCountMapper.class);
//设置Reducer类
job.setReducerClass(WordCountReducer.class);
//设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置Reducer的输出结果类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入路径
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.140.128:9000/wc/words.txt"));
//设置输出路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.140.128:9000/wc/result6"));
//任务的提交
job.waitForCompletion(true);
}
}
4. 手写Top-K
public class TopKMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {
private int k = 10; // TopK值
private PriorityQueue<Integer> heap; // 堆
@Override
public void setup(Context context) {
k = context.getConfiguration().getInt("k", 10); // 从Configuration中获取TopK值
heap = new PriorityQueue<>(k, Collections.reverseOrder()); // 初始化堆
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
int num = Integer.parseInt(value.toString()); // 获取输入数据
if (heap.size() < k) { // 如果堆未满则将数据加入堆中
heap.offer(num);
} else {
int smallest = heap.peek(); // 获取当前堆中的最小值
if (num > smallest) { // 如果输入数据大于最小值则替换堆中最小值
heap.poll();
heap.offer(num);
}
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
for (int num : heap) { // 将堆中的TopK数据输出
context.write(NullWritable.get(), new IntWritable(num));
}
}
}
public class TopKReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
private int k = 10; // TopK值
private PriorityQueue<Integer> heap; // 堆
@Override
public void setup(Context context) {
k = context.getConfiguration().getInt("k", 10); // 从Configuration中获取TopK值
heap = new PriorityQueue<>(k, Collections.reverseOrder()); // 初始化堆
}
@Override
public void reduce(NullWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value : values) { // 遍历输入数据
int num = value.get();
if (heap.size() < k) { // 如果堆未满则将数据加入堆中
heap.offer(num);
} else {
int smallest = heap.peek(); // 获取当前堆中的最小值
if (num > smallest) { // 如果输入数据大于最小值则替换堆中最小值
heap.poll();
heap.offer(num);
}
}
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
for (int num : heap) { // 将堆中的TopK数据输出
context.write(NullWritable.get(), new IntWritable(num));
}
}
}
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class TopKApp {
// 定义输入路径和输出路径
private static final String INPUT_PATH = "hdfs:/xxx/topk_input";
private static final String OUTPUT_PATH = "hdfs://xxx/topk_output";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
Path outputPath = new Path(OUTPUT_PATH);
// 如果输出路径已经存在则先删除
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
// 创建一个MapReduce任务并设置相关的配置信息
Job job = Job.getInstance(conf, "TopKApp");
job.setJarByClass(TopKApp.class);
// 设置Mapper和Reducer的类
job.setMapperClass(TopKMapper.class);
job.setReducerClass(TopKReducer.class);
// 设置Mapper的输出类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
// 设置Reducer的输出类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// 设置输出文件格式
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, outputPath);
// 等待任务完成
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
参考链接
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |