MapReduce 重要组件——Partitioner组件


(1)Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理;

(2)你可以自定义key的一个分发股则,如数据文件包含不同的省份,而输出的要求是每个省份输出一个文件;


(3)提供了一个默认的HashPartitioner



自定义Partitioner:
(1)继承抽象类Partitioner,实现自定义的getPartition()方法;
(2)通过job.setPartitionerClass()来设置自定义的Partitioner;



Partitioner应用场景:
需求:分别统计每个省份用户流量情况



1、要统计的省份用户流量信息


2、实现要求


不同区域用户生成一个文件,主要考虑如果一个文件,数据量太大。就要求分开存储。


假设:  137 开头的用户为北京,代码100


            138 开头的用户为天津,代码为200


            139 开头的用户为河北,代码为300


           其他用户输出到一个文件


为实现不同用户生成一个文件,需要按照用户区域进行分区



3、代码实现


3.1 可以使用hadoop中的Partition功能自定义分组(默认为HashPartition)来实现,在通过指定Reduce的个(默认为1)数完成该需求



3.2 Hadoop中默认Partition的实现方法

/**
 
 
 * Partitions the key space.
 
 
 *
 
 
 * 
  <p><code>
  Partitioner 
  </code> 
  controls the partitioning of the keys of the
 
 
 * intermediate map 
  -
  outputs. The key (or a subset of the key) is used to derive
 
 
 * the partition, typically by a hash function. The total number of partitions
 
 
 * is the same as the number of reduce tasks for the job. Hence this controls
 
 
 * which of the 
  <code>
  m
  </code> 
  reduce tasks the intermediate key (and hence the
 
 
 * record) is sent for reduction. 
  </p>
 
 
 *
 
 
 * Note: If you require your Partitioner class to obtain the Job's configuration
 
 
 * object, implement the 
  {@link Configurable} 
  interface.
 
 
 *
 
 
 * 
  @see 
  Reducer
 
 
 */
 
 
@
  InterfaceAudience.
  Public
 
 
@
  InterfaceStability.
  Stable
 
 
public 
  abstract 
  class 
  Partitioner<KEY, VALUE> {
 
 
 
 
 
  
  /**
 
 
   * Get the partition number for a given key (hence record) given the total
 
 
   * number of partitions i.e. number of reduce 
  -
  tasks for the job.
 
 
   *  
 
 
   * 
  <p>
  Typically a hash function on a all or a subset of the key.
  </p>
 
 
   *
 
 
   * 
  @param 
  key the key to be partioned.
 
 
   * 
  @param 
  value the entry value.
 
 
   * 
  @param 
  numPartitions the total number of partitions.
 
 
   * 
  @return 
  the partition number for the 
  <code> 
  key
  </code>
  .
 
 
   */
 
 
  
  public 
  abstract 
  int 
  getPartition (KEY 
  key 
  , VALUE 
  value
  , 
  int 
  numPartitions 
  );
 
 
 
 
 
}
 
 

 
 

 
 

 
  
/** Partition keys by their 
   {@link Object#hashCode()}
   . */
 
  
@
   InterfaceAudience.
   Public
 
  
@
   InterfaceStability.
   Stable
 
  
public 
   class 
   HashPartitioner<K, V> 
   extends 
   Partitioner<K, V> {
 
  

 
  
  
   /** Use 
   {@link Object#hashCode()} 
   to partition. */
 
  
  
   public 
   int 
   getPartition(K 
   key
   , V 
   value 
   ,
 
  
                          
   int 
   numReduceTasks 
   ) {
 
  
    
   return 
   ( 
   key
   .hashCode() & Integer.
   MAX_VALUE
   ) % 
   numReduceTasks
   ;
 
  
  }
 
  

 
  
}
 

 

 3.3 自定义程序实现

 

 
 

 代码结构:
 

 

 
 

 具体代码:
 
package 
 partition;
 

 
import 
 java.util.HashMap;
 
import 
 java.util.Map;
 

 
import 
 org.apache.hadoop.mapreduce.Partitioner;
 

 
public 
 class 
 AreaPartitioner<KEY, VALUE> 
 extends 
 Partitioner<KEY, VALUE>{
 
                 
 public 
 static 
 Map<String, Integer> 
 areaMap 
 = 
 new 
 HashMap<String, Integer>();
 
                
 
                 
 static
 {
 
                                
 
                                 
 areaMap
 .put(
 "139" 
 , 0);
 
                                 
 areaMap
 .put(
 "137" 
 , 1);
 
                                 
 areaMap
 .put(
 "159" 
 , 2);
 
                }
 
                
 

 
                 
 @Override
 
                 
 public 
 int 
 getPartition(KEY 
 key
 , VALUE 
 value
 , 
 int 
 numPartitions
 ) {
 
                                 
 int 
 area 
 = 3;
 
                                Integer 
 res 
 = 
 areaMap
 .get(
 key
 .toString().substring(0, 3));
 
                                 
 if
 (
 res 
 !=
 null
 ){
 
                                                 
 area
 =
 res 
 ;
 
                                }
 
                                 
 return 
 area 
 ;
 
                }
 

 
}



package partition;
 

 import java.io.DataInput;
 
 import java.io.DataOutput;
 
 import java.io.IOException;
 

 import org.apache.hadoop.io.Writable;
 

 public class FlowBean implements Writable{
 
      private String mobileNumber;
 
      private long upFlow;
 
      private long downFlow;
 
     
 
      public void set(String mobileNumber,long upFlow,long downFlow){
 
           this.mobileNumber = mobileNumber;
 
           this.upFlow = upFlow;
 
           this.downFlow = downFlow;
 
      }
 
      @Override
 
      public void write(DataOutput out) throws IOException {
 
          
 
           out.writeUTF(mobileNumber);
 
           out.writeLong(upFlow);
 
           out.writeLong(downFlow);
 
      }
 

      @Override
 
      public void readFields(DataInput in) throws IOException {
 
          
 
           this.mobileNumber = in.readUTF();
 
           this.upFlow = in.readLong();
 
           this.downFlow = in.readLong();
 
      }
 

      public String getMobileNumber() {
 
           return mobileNumber;
 
      }
 

      public void setMobileNumber(String mobileNumber) {
 
           this.mobileNumber = mobileNumber;
 
      }
 

      public long getUpFlow() {
 
           return upFlow;
 
      }
 

      public void setUpFlow(long upFlow) {
 
           this.upFlow = upFlow;
 
      }
 

      public long getDownFlow() {
 
           return downFlow;
 
      }
 

      public void setDownFlow(long downFlow) {
 
           this.downFlow = downFlow;
 
      }
 
      @Override
 
      public String toString() {
 
           return this.mobileNumber + "\t" + this.upFlow + "\t" + this.downFlow + "\t" + (this.upFlow + this.downFlow);
 
      }
 
 }
  

 
 

 package partition;
 

 import java.io.IOException;
 

 import org.apache.commons.lang.StringUtils;
 
 import org.apache.hadoop.io.LongWritable;
 
 import org.apache.hadoop.io.Text;
 
 import org.apache.hadoop.mapreduce.Mapper;
 


 public class FlowPartitionMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
 

      private Text k = new Text();
 
      private FlowBean v = new FlowBean();
 
      @Override
 
      protected void map(LongWritable key, Text value,Context context)
 
                throws IOException, InterruptedException {
 
          
 
           String[] values = StringUtils.split(value.toString(), "\t");
 
           String mobileNumber = values[1];
 
           long upFlow = new Long(values[7]);
 
           long downFlow = new Long(values[8]);
 
       
 
           k.set(mobileNumber);
 
           v.set(mobileNumber, upFlow, downFlow);
 
          context.write(k,v);
 
      }
 
 }
  

 
 

 
 

 package partition;
 

 import java.io.IOException;
 

 import org.apache.hadoop.io.NullWritable;
 
 import org.apache.hadoop.io.Text;
 
 import org.apache.hadoop.mapreduce.Reducer;
 

 public class FlowPartitionReducer extends Reducer<Text, FlowBean, FlowBean,NullWritable> {
 
     
 
      private FlowBean flowBean = new FlowBean();
 
      @Override
 
      protected void reduce(Text k, Iterable<FlowBean> values, Context context) throws IOException,
 
                InterruptedException {
 

           long sumUpFlow = 0;
 
           long sumDownFlow = 0;
 
          
 
           for(FlowBean value:values){
 
                sumUpFlow += value.getUpFlow();
 
                sumDownFlow += value.getDownFlow();
 
           }
 
           flowBean.setMobileNumber(k.toString());
 
           flowBean.setUpFlow(sumUpFlow);
 
           flowBean.setDownFlow(sumDownFlow);
 
           context.write(flowBean, NullWritable.get());
 
      }
 
 }
  

 
 

 
 

 package partition;
 

 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.conf.Configured;
 
 import org.apache.hadoop.fs.FileSystem;
 
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.io.NullWritable;
 
 import org.apache.hadoop.io.Text;
 
 import org.apache.hadoop.mapreduce.Job;
 
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import org.apache.hadoop.util.Tool;
 
 import org.apache.hadoop.util.ToolRunner;
 

 public class FlowPartitionRunner extends Configured implements Tool{
 

      private static final String HDFS_PATH = "hdfs://cloud01:9000";
 
      @Override
 
      public int run(String[] args) throws Exception {
 
           Configuration conf = new Configuration();
 
           conf.set("mapreduce.job.jar", "part.jar");
 
           Job job = Job.getInstance(conf);
 
           job.setJarByClass(FlowPartitionRunner.class);
 

           job.setMapperClass(FlowPartitionMapper.class);
 
           job.setReducerClass(FlowPartitionReducer.class);
 
          
 
           job.setPartitionerClass(AreaPartitioner.class);
 

           //大于等于分组个数
 
           job.setNumReduceTasks(4);
 

           job.setMapOutputKeyClass(Text.class);
 
           job.setMapOutputValueClass(FlowBean.class);
 
           job.setOutputKeyClass(FlowBean.class);
 
           job.setOutputKeyClass(NullWritable.class);
 

           Path inputPath = new Path(HDFS_PATH + "/flow/input");
 
           Path outputDir = new Path(HDFS_PATH + "/flow/partition/output");
 
     
 
           FileInputFormat.setInputPaths(job, inputPath);
 
          
 
           FileOutputFormat.setOutputPath(job, outputDir);
 

           FileSystem fs = FileSystem.get(conf);
 
           if (fs.exists(outputDir)) {
 
                fs.delete(outputDir, true);
 
           }
 
           return job.waitForCompletion(true)?0:1;
 
      }
 
      public static void main(String[] args) {
 
           try {
 
                int status = ToolRunner.run(new Configuration(), new FlowPartitionRunner(), args);
 
                System.exit(status);
 
           } catch (Exception e) {
 
                e.printStackTrace();
 
           }
 
      }
 
 }


执行结果:


[hadoop@cloud01 ~]$ hadoop jar /home/hadoop/workspace/HDFSdemo/part.jar  partition.FlowPartitionRunner


[hadoop@cloud01 ~]$ hadoop fs -ls /flow/partition/output



阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Hadoop