Blog

  • In this blog I will show how does the partitioning works in hadoop.

    Why we need to do partitioning in map reduce ??

    As you must be aware that a map reduce job takes an input data set and produces the list of key value paire(Key,value) which is a result of map phase in which the input data set is split and each map task processs the split and each map output the list of key value pairs.

    The output from map are then feed to reduce tasks which processes the user defined reduce function on map outputs. But before the reduce phase is another process that partition the map outputs based on the key and it keeps the record of same key into the same partitions.

    Again why we are doing partitioning before providing them to reduce tasks.


    Consider an example

    We have a word count example, you must have analysed the outputs of such well known example let'say our input dataset by two map tasks gives results as -

    Map output1 
    i 20
    we 15 
    they 12 
    an 25 
    
    Map output2 
    their 12 
    them 10 
    we 15 
    to 18
    

    So if you observe the output from two map tasks you should have noticed that the count of word 'we' is in outputs of both map tasks and it will be processed twice if sent to two different reducers So here partitioning plays the role. 


    Before it sends outputs to reducers it will partition the intermediate key value pairs based on key and send the same key to the same partition.

     

    How the number of partitions are decided ??

    Hadoop decides it at the time when the map reduce job starts that how may partitions will be there which is controlled by the JobConf.setNumReduceTasks()) method, suppose if decide 20 reduce tasks, the 20 partitions will be there and must be filled.

     

    What is happening by default ??

    By default the partitioner implementation is called HashPartitioner. It uses the hashCode() method of the key objects modulo the number of partitions total to determine which partition to send a given (key, value) pair to.

    Partitioner provides the getPartition() method that you can implement yourself if you want to declare the custom partition for your job.
    The getPartition() method receives a key and a value and the number of partitions to split the data, a number in the range [0, numPartitions) must be returned by this method, indicating which partition to send the key and value to. For any two keys k1 and k2, k1.equals(k2) implies getPartition(k1, *, n) == getPartition(k2, *, n).

     

     public int getPartition(K key, V value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
     

    So you have seen why partitioning is necessary, now let me shade a light on what is poor partitioning and why it happens ??

     

    Case of Poor partitioning and how to overcome it ??

     

    Suppose you know that one of the key in your data input will appear more than any other key so you may want to send all your key (large number) to one partition and then distribute the other keys over all other partition by their hasCode(). So now if you have two mechanism of sending data to partitions

     

    1. First,the key appearing more will be send to one partition
    2. Second, all other keys will be send to partitions according to their hashCode().
    3. Now suppose if your hashCode() method does not uniformly distribute other keys data over partitions range. So the data is not evenly distributed in partitions as well as reducers.Since each partition is equivalent to a reducer.So here some reducers will have more data than other reducers.So other reducers will wait for one reducer(one with user defined keys) due to the work load it shares.

    So here we should take an approach that its work load may be shared across many different reducers.


    Example Scenario

    Let's say i have an data set of the form person information,Our dataset contains the name,country, sports liked(To keep it simple i have used three fields).

     

    PersonA,India,Cricket
    PersonB,Brazil,Soccer
    PersonC,Australia,Baseball
    PersonD,India,Cricket
    PersonE,England,Cricket
    PersonF,Australia,Cricket
    PersonG,India,Cricket
    PersonH,England,Cricket
    PersonI,India,Cricket
    PersonJ,India,Cricket
    PersonK,India,Cricket
    ..
     

    We need to count the person for each of the game in the list. 


    So our key becomes the third field i.e., the game .

     

    Observe the above example and let's suppose we have a large set of data like this where the frequency of data is in direction of country india.

    I have used the country frequency factor to show when u need the partitioning.

    How it works ??

     

    Our map function will take the inputs and generate the intermediate key value pair. so what it ll do it send the output to reducers with default partitioning using the hashPartitioner which uses the hashCode() to partition the data.

     

    And what does the default partitioner does it will send out all values with the same keys to same reducer.

     

    So all values with same key(cricket) send to same reducer. but since our data contains a large number of such key value pair due the fact the country frequency of dataset is india.

    And also data key (cricket) is also present for other country map output. So we have a lots of key value data to send to same reducer for cricket key. Here we will write our custom partitioner. And follows the approach below.

     

    Our custom partitioner will send all key value by country india to one partition and other key value with countries like(England,Australia) to other partition so that work load one reducer that should process key cricket is divided into two reducers. 

     

    Implementation of our custom Partitoner(you can clone the project at https://github.com/roanjain/hadoop-partitioner)

    package com.mapred.partitioner;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Mapper;
    /**
     * 
     * @author Rohan Jain
     *
     */
    public class PartitionerDemo extends Configured implements Tool {
    	/**
    	 * Mapper class generating key value pair of game,country as intermediate keys
    	 */
    	public static class PartitionerMap extends Mapper<LongWritable, Text, Text, Text> {
    		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
    			String[] words = value.toString().split(" ");
    			try{
    			context.write(new Text(words[2]),new Text(words[1]));
    			}
    			catch(Exception e){
    				System.err.println(e);
    			}
    		}
    	}
    	/**
    	 * Each partition processed by different reducer tasks as defined in our custom partitioner
    	 */
    	public static class PartitionerReduce extends Reducer<Text,Text,Text,IntWritable> {
    		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
    			int gameCount=0;
    			for(Text val:values){
    				gameCount++;
    			}
    			context.write(new Text(key),new IntWritable(gameCount));
    		}
    	}
    	/**
    	 * Our custom Partitioner class will divide the dataset into three partitions one with key as cricket and value as
    	 * india, second partition with key as cricket and value other than india, and third partition with game(key) other 
    	 * than cricket 
    	 */
    	public static class customPartitioner extends Partitioner<Text,Text>{
    		public int getPartition(Text key, Text value, int numReduceTasks){
    		if(numReduceTasks==0)
    			return 0;
    		if(key.equals(new Text("Cricket")) && !value.equals(new Text("India")))
    			return 0;
    		if(key.equals(new Text("Cricket")) && value.equals(new Text("India")))
    			return 1;
    		else
    			return 2;
    		}
    	}
    	public static void main(String[] args) throws Exception {
    		int res= ToolRunner.run(new Configuration(),new PartitionerDemo(),args);
    		System.exit(res);
    	}
    	public int run(String[] args) throws Exception {
    		if(args.length!=2){
    			System.out.print("Run as -- hadoop jar /path/to/partitioner.jar /inputdataset /output");
    			System.exit(-1);
    		}
    		
    		Configuration conf = this.getConf();
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(PartitionerDemo.class);
    		
    		//Set number of reducer tasks
    		job.setNumReduceTasks(3);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    		
    		job.setMapperClass(PartitionerMap.class);
    		job.setReducerClass(PartitionerReduce.class);
    
    		//Set Partitioner Class
    		job.setPartitionerClass(customPartitioner.class);
    		
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputFormatClass(TextOutputFormat.class);
    		
    		FileInputFormat.setInputPaths(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    	
    }
    

    You can check your output for your partitioned dataset at output directory, you will see the output as below :

    hduser@rohan-Vostro-3446:/usr/local/hadoop/bin$ hadoop fs -ls /partitionerOutput/
    14/12/01 17:50:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 4 items
    -rw-r--r--   1 hduser supergroup          0 2014-12-01 17:49 /partitionerOutput/_SUCCESS
    -rw-r--r--   1 hduser supergroup         10 2014-12-01 17:48 /partitionerOutput/part-r-00000
    -rw-r--r--   1 hduser supergroup         10 2014-12-01 17:48 /partitionerOutput/part-r-00001
    -rw-r--r--   1 hduser supergroup          9 2014-12-01 17:49 /partitionerOutput/part-r-00002
    

Tags: hadoop