Reduce side join in hadoop : Data analyses from different types of data sources

Posted By : Rohan Jain | 24-Nov-2014

Why we need to do data join ??

Consider an example we have two different data sources, our first data source has data of the form Person UID,Name, and total friends.


1,PersonA,300

2,PersonB,200

3,PersonC,250

 

And our second data source has data of form Person ID, current city, hometown , age, presentYear.


1,Gurgaon,Agra,22,2013

2,Delhi,Agra,21,2013

2,Noida,Agra,21,2014

 

Now we need the data from two different types of data source one with basic info other with socialinfo(kind of), and present the result in combined form.

So to generate the output based on different data sources that are related in some way we can generate the combined result  on the basis of some parameter , say (Person UID), and generate output like -

1    PersonA,300,Gurgaon,Agra,22,2013


To acheive this, Hadoop has a package called datajoin that works as a generic framework for data joining.

 

What is Reduce side joins

Named so, because done on Reduce side.
New terminologies 
->data source , tag , and group key

Data Source -> input file/files
Tags -> The MapReduce paradigm calls for processing each record one at a time in a stateless
manner. If we want some state information to persist, we have to tag the record with
such state.

from example : To associate record with socialinfo or basicinfo tag.

Group key -> it is a join key (as in relational database), it is Person UID in our case.

 

How it Works


For example, 
Consider the record

2,PersonB,200

The record is form the socialinfo file, map() will output a key/value pair where the key is "2" , the Person UID that will be used to join with records from the socialinfo file. The value output by map() is the entire record wrapped by a tag "socialinfo".

Consider record form other data source

2,Delhi,Agra,21,2013

The record is form the basicinfo file, map() will output a key/value pair where the key is "2" , the Perosn UID that will be used to join with records from the basicinfo file. The value output by map() is the entire record wrapped by a tag "basicinfo".


Now the function reduce() will unwrap the package to get the original record and the data source of the record by its tag. We see that for group key (Person UIDs) "1" and "2" will gets two values. One value is tagged with "socialinfo" and the other value is tagged with "basicinfo"

The function reduce() will take its input and do a full cross-product on the values. Reduce() creates all combinations of the values with the constraint that a combination will not be tagged more than once.

Map Output
  

In out case our join key "2" has three values tagged differntly two with socialinfo and one with basicinfo.

Cross product creates two combinations


2    PersonB,200,Noida,Agra,21,2014
2    PersonB,200,Delhi,Agra,21,2013

and only one combination with join key 1 as record.

1    PersonA,300,Gurgaon,Agra,22,2013

 

Note: In cases where reduce() sees values of distinct tags,the cross-product is the original set of values.

Now It feeds each combination from the cross-product into a function called combine() .

The combine() function decides whether the whole operation is an inner join , outer join , or another type of join.

It it is an innner join then our record with key 3 from basicinfo will be dropped.

3,PersonC,250

 

Implementaion of exercise. (You can clone the project using  https://github.com/roanjain/reduce-side-join-hadoop )

You can find datajoin package jar in /share/hadoop/tool/lib

 


package com.reducesidejoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.reducesidejoin.ReduceSideJoin.Map.TaggedWritable;
/**
 * @author Rohan
 * This reduce side join job uses the old hadoop API, as there are numerous incompatibility issues with the new API.
 */
public class ReduceSideJoin extends Configured implements Tool {
	
	public static class Map extends DataJoinMapperBase {
		public static class TaggedWritable extends TaggedMapOutput{
			private Writable data;
			/**
			 * Need to Initialize the empty constructor as we have declared a args constructor, otherwise you will
			 * get a NoSuchMethodException error.
			 */
			public TaggedWritable(){
				this.tag = new Text();
			}
			public TaggedWritable(Writable data){
				this.tag = new Text(" ");
				this.data = data;
			}
			public Writable getData() {
				return data;
			}
			public void setData(Writable data) {
				this.data = data;
			}
			public void write(DataOutput out) throws IOException {  
	            this.tag.write(out);  
	            out.writeUTF(this.data.getClass().getName());
	            this.data.write(out);  
	        }
			public void readFields(DataInput in) throws IOException {  
		            this.tag.readFields(in);  
		            String dataClz = in.readUTF();  
		            if (this.data == null || !this.data.getClass().getName().equals(dataClz)) {  
		                try {  
		                    this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);  
		                } catch (ClassNotFoundException e) {  
		                    e.printStackTrace();  
		                }  
		            }  
		            this.data.readFields(in);  
		        }  
		}
		
		/**
		 * Method is used to generate  the tag for different data source using input file name
		 */
		public Text generateInputTag(String inputFile){
			String dataSource = inputFile;
			return new Text(dataSource);
		}
		
		/**
		 *  Method is used to generate the group/join key for different records
		 */
		protected Text generateGroupKey(TaggedMapOutput	aRecord){
			String line = ((Text) aRecord.getData()).toString();
			String[] tokens = line.split(",");
			String groupKey = tokens[0];
			return new Text(groupKey);
		}
		
		/**
		 * 	Since our values are of type custom writable i.e., taggedWritable cast the value as this and
		 *  return the result with input tag attached to each value
		 */
		protected TaggedMapOutput generateTaggedMapOutput(Object value){
			TaggedWritable retv = new TaggedWritable((Text) value);
			retv.setTag(this.inputTag);
			return retv;
		}
		
		/**
		 * For each record in data source file it will call the map function again and pass the record 
		 * to associate a tag attribute with each record of that data source using it's input file name.
		 * So all records of a data source file will get the same tag.
		 * Map function which outputs the key/value pair where key is the JoinKey 
		 * and values are record of type TaggedWritable having an tag attribute associated with them.
		 * @param key
		 * @param value
		 * @param context
		 * @throws IOException
		 * @throws InterruptedException
		 */
		@SuppressWarnings({ "rawtypes", "unchecked" })
		public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
			TaggedMapOutput aRecord = generateTaggedMapOutput(value);
			Text groupKey = generateGroupKey(aRecord);
			context.write(groupKey, aRecord);
		}
	}
	
	/**
	 * The reducer function will only have a single combine method, 
	 * which takes the object args of tag list and values wrapped with tags types. 
	 * 
	 */
	public static class Reduce extends DataJoinReducerBase {
		protected TaggedMapOutput combine(Object[] tags, Object[] values) {
			if(tags.length<2) return null;
			String joinedStr = "";
			for(int i=0; i<values.length;i++){ if(i>0) joinedStr += ",";
				TaggedWritable tw = (TaggedWritable) values[i];
				String line = ((Text) tw.getData()).toString();
				String[] tokens = line.split(",",2);
				joinedStr += tokens[1];
			}
			TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
			retv.setTag((Text) tags[0]);
			return retv;
		}
	}
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new ReduceSideJoin(), args);
		System.exit(res);
	}
	
	public int run(String[] args) throws Exception {
	
		Configuration conf = getConf();
		JobConf job = new JobConf(conf,ReduceSideJoin.class);
		job.setJarByClass(ReduceSideJoin.class);
		Path in = new Path(args[0]);
		Path out = new Path(args[1]);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TaggedWritable.class);
		
		
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		JobClient.runJob(job);  
		return 0;  
        }
}

Thanks

About Author

Author Image
Rohan Jain

Rohan is a bright and experienced web app developer with expertise in Groovy and Grails development.

Request for Proposal

Name is required

Comment is required

Sending message..