hadoop之MapReduce框架原理
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
目录
MapReduce框架的简单运行机制
MapReduce是分为两个阶段的MapperTask阶段和ReduceTask阶段。中间有一个Shuffle阶段
Mapper阶段可以通过选择什么方式K,V的选择对应不同的方法来读取数据读取后把数据交给Mapper来进行后续的业务逻辑用户写让后进入Reduce阶段通过Shuffle来拉取Mapper阶段的数据让后通过OutputFormat(等方法)来写出可以是ES,mysqlhbase文件
Mapper阶段
InputFormat数据输入
切片与MapTask并行度决定机制
MapTask个数决定了并行度相当于在生成map集合的过程中有几个人在干活**不一定越多越好当数据量小的时候可能开启的众多MapTask的时间用一个MapTask已经计算完成
数据块Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片数据切片只是在逻辑上对输入进行分片并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位一个切片会对应启动一个MapTask。
job提交过程源码解析
因为我们找的job提交所以在job提交函数哪里打个断点
步入函数后
ensureState(JobState.DEFINE); 是确保你的状态是正确的状态不对或者running 都会抛异常
setUseNewAPI(); 处理Hadoop不同版本之间的API兼容
connect(); 连接客户端需要与集群或者本机连接
checkSpecs(job); 校验 校验输出路径是否已经创建是否有参 return submitter.submitJobInternal(Job.this, cluster); 核心代码 步入的时候需要点两下
第一个步入是步入的参数Job 第二个才步入此方法
这个方法是提交job在集群模式下提交的job包含通过客户端方式把jar包提交给集群在本地不需要提交jar包jar在本地是存在的
还会进行切片生成切片信息几个切片就有几个MapTask
还会 生成xml文件
综上 job提交会交三样东西jar,xml文件切片信息---》集群模式下
最后会删除所有的信息文件
切片逻辑
**切片是每一个文件单独切片
在本地是32m一块前边说过默认一块对应一个切片但是有前提条件再你减去32m的时候余下最后一块如果大于1.1倍就重新分配切片但如果小于1.1则不能更新分片
例子1
已有一个32.1m的数据 物理分块是32m+0.1m切片分布是1个切片因为32.1/32=1.003125<1.1 所以使用一个切片
例子2
已有一个100m的数据
100-32-32=36>32(36/32=1.125>1.1 所以最后36m需要分配两个切片)
**块的大小没办法改变但是可以调切片大小maxSize让切片调小minSize让切片调大
切片总结
开一个MapTask 默认是占1g内存+1个cpu
1FileInputFormat实现类
思考在运行MapReduce程序时输入的文件格式包括基于行的日志文件、二进制格式文件、数据库表等。那么针对不同的数据类型MapReduce是如何读取这些数据的呢
FileInputFormat常见的接口实现类包括TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。(应用场景的不同选择不同的接口实现类)
TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量 LongWritable类型。值是这行的内容不包括任何行终止符换行符和回车符Text类型。
CombineTextInputFormat用于小文件过多的场景它可以将多个小文件从逻辑上规划到一个切片中这样多个小文件就可以交给一个MapTask处理。
进行虚拟存储
1虚拟存储过程
将输入目录下所有文件大小依次和设置的setMaxInputSplitSize切片大小值比较如果不大于设置的最大值逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍那么以最大值切割一块当剩余数据大小超过设置的最大值且不大于最大值2倍此时将文件均分成2个虚拟存储块防止出现太小切片。
测试
再不使用CombineTextInputFormat情况下默认TextInputFormat
可以看到切片为4
添加代码设置实现类为CombineTextInputFormat 和 设置虚拟存储切片大小
// 如果不设置InputFormat它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
可以看到现在是3个切片
我们可以通过改变虚拟切片大小来改变调用的切片的数量
综上影响切片的数量的因素为1数据量的大小2切片的大小一般会自动调整3文件格式有些文件是不可切片的
影响切片大小的因素 HDFS中块的大小通过调maxsize,minsize与块的大小进行比较来判断
Shuffle阶段
shuffle阶段是一个从mapper阶段出来的后的阶段会写入k,v一个环形缓冲区缓冲区分为两半一半存储索引一半存储数据默认100m,到达80%后会反向逆写减少时间消耗提高效率逆写是因为不需要等待全部溢写后在进行写入操作逆写入文件前会进行分区分区的个数与reduceTask的个数有关排序对key进行排序但是存储位置并不发生改变只改变索引的位置改变存储位置消耗资源较大写入文件后会进行归并排序在有序的情况下归并是最高效的
排序
排序可以自定义排序举例全排序
自定义了一个Bean类bean对象做为key传输需要实现WritableComparable接口重写compareTo方法就可以实现排序。
Combiner合并
并不满足所有生产环境下只有在不影响最终业务逻辑下才可以实现求和就可以算平均值就不可以
combiner与reducetask区别如下
ReduceTask阶段:
1Copy阶段ReduceTask从各个MapTask上远程拷贝一片数据并针对某一片数据如果其大小超过一定阈值则写到磁盘上否则直接放到内存中。
2Sort阶段在远程拷贝数据的同时ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并以防止内存使用过多或磁盘上文件过多。按照MapReduce语义用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序因此ReduceTask只需对所有数据进行一次归并排序即可。
3Reduce阶段reduce()函数将计算结果写到HDFS上。
ReduceTask的个数可以手动进行设置设置几就会产生几个文件分区同上
Reduce Join
简述流程
1自定义bean对象序列化反序列化函数---implements Writable
2写mapper类 先重写setup方法因为本案例需要两个文件初始化读多个文 希望先获取到文件名称多文件 一个文件一个切片 setup方法是一个优化手段 获取文件名称
3写reduce类业务逻辑 先创建一个集合类型为bean类型和bean对象用于存储
用for循环遍历valuekey是一样的 一样的key才会进入同一个reduce方法
获取文件名判断写出不同的业务逻辑
"order"表
先创建一个bean对象用于存储数据用于后续写入集合
用到方法 BeanUtils.copyProperties(tmpOrderBean,value); 获取原数据
让后加入上述创建的集合 orderBeans.add(tmpOrderBean);
“pd”表
BeanUtils.copyProperties(pdBean,value);直接获取原数据
存储结束结合阶段
使用增强for
orderbean.setPname(pdBean.getPname());
使用set函数直接设置集合中的pname
让后写入
context.write(orderbean,NullWritable.get()); 业务结束
Reduce Join的缺点这种方式中合并的操作是在Reduce阶段完成Reduce端的处理压力太大Map节点的运算负载则很低资源利用率不高且在Reduce阶段极易产生数据倾斜。
Map Join
使用场景
Map Join适用于一张表十分小、一张表很大的场景。
Map端实现数据合并就解决了Reduce Join的缺点数据倾斜
简述流程
在map类中
setup方法将较小文件读入缓存将数据存储到全局的map集合中将缓存中的数据全部写入
重写的map方法中
转换成字符串在切割通过切割后的数组获取map集合中的pname
让后重新设置输出文件的格式进行写出
至此mapreduce完结