Control How To Read Data : Map Reduce CustomInputFormat

Posted By : Rohan Jain | 15-Dec-2014

In this blog I am going to show how you can implement a custom input format in hadoop.

Hadoop natively provides a various input classes to suit your need but at many times the situation arises when you want your own input class.

So first lets take an example of TextInputFormat Class .


How does it see our input ??


It take the input file and reads each line as a single record and it treat the byte offset of line as key and its value is the value of the line.

Now suppose you want the key and value where you need key and values more than one line. hadoop also provides the NLineInputFormat class for that. but suppose we need a kind of input format like our data is in form of  :

 

Some text..Some text..Some text..Some text..EndSome text..Some text..Some text..Some text..EndSome text..Some text..End

 

Now you can see their is no fixed pattern in our input so how do we manage to get the key and values in the format like we want the values as those which are separated by marker End wherever we get this marker we treat this as differnt record so how can we achieve this.


Hadoop provides the mechanism for creating our custom input class.

So what all does a input format defines ??

  • First, Calculate the input split of data.

 

So what is input split ??
Input split is the data which is processed by mapper instance.
Each Mapper instance will get the separate input split.
This method is used to get the input split

abstract List getSplits(JobContext context)

  • Second, Provide a logic to read the input split

 

Each mapper will get a unique input split and our input format provides the logic to read that input split using RecordReader. 


What is Record Reader ??


Record Reader will read the input split and emit the keys and values as input for each map function.
A map function will be called again for each key and value pair.
This method is used to create the record reader for given split.

abstract RecordReader <K,V> createRecordReader(InputSplit is,
TaskAttemptContext context)

We will create a SearchKeywordInputFormat class that will read the logs of search by keyword data like

 

$$Keyword-Java
05-12-2014 10:55:09 IND IP ... ... 

$$Keyword-Hadoop
05-12-2014 8:58:41 AUS IP ... ...
05-12-2014 8:58:45 IND IP ... ...
05-12-2014 10:58:45 IND IP ... ...
05-12-2014 11:58:45 IND IP ... ...

$$Keyword-Android
05-12-2014 10:58:41 AUS IP ... ...
05-12-2014 11:58:41 AUS IP ... ...

 

Our searchkeyword input format class will simply emit key and value based on the $$keyword marker as soon as it gets the marker,it ll generate a unique key value pair.

 

Implementation

package com.searchkeyword;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/** 
* 
* @author Rohan Jain 
* 
*/
public class SearchKeywordFormat extends FileInputFormat<longwritable,text> {

	public RecordReader<longwritable,text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
		SearchKeywordRecordReader reader = new SearchKeywordRecordReader();
		reader.initialize(split,context);
		return reader;
	}
	@Override 
	public boolean isSplitable(JobContext context, Path file) {
		return false;
	}
}
package com.searchkeyword;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
 *
 * @author Rohan Jain
 *
 */
public class SearchKeywordRecordReader extends RecordReader<LongWritable,Text> {
	private Path splitFilePath = null;
	private Text value = new Text();
	private LongWritable key = new LongWritable();
	FSDataInputStream filein = null;
	private boolean stillInChunk = true;
	private DataOutputBuffer buffer = new DataOutputBuffer();
	private long start;
	private long end;
	private byte[] endTag = "$$Keyword".getBytes(); 
	
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
		FileSplit fileSplit = (FileSplit) split;
		start =  fileSplit.getStart();
		end = start + fileSplit.getLength();
		Configuration conf = context.getConfiguration();
		FileSystem fs = FileSystem.get(conf);
		splitFilePath = fileSplit.getPath();
		filein = fs.open(splitFilePath);
		filein.seek(start);
		if(start!=0){
			readUntilMatch(endTag, false);
		}
	}

	// function which generate a record with key value pair
	public boolean nextKeyValue() throws IOException{
		if(!stillInChunk) return false;
		boolean status = readUntilMatch(endTag,true);
		value = new Text();
		value.set(buffer.getData(),0,buffer.getLength());
		key = new LongWritable();
		buffer.reset();
		if(!status){
			stillInChunk = false;
		}
		return true;		
	}

	private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
		int i=0;
		while(true){
			int nextByte = filein.read();
			if(nextByte == -1) return false;
			if(withinBlock) buffer.write(nextByte);
			if(nextByte == match[i]){
				i++;
				if(i>=match.length){
					return filein.getPos() < end;
				}
			} 
			else i=0;
		}
	}

	@Override
	public void close() throws IOException {
		filein.close();
	}

	@Override
	public LongWritable getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
	         return 0;
	}
}

Make sure when you create custom input format class to configure job as            

job.setInputFormatClass(SearchKeywordInputFormat.class)

 

Thanks

About Author

Author Image
Rohan Jain

Rohan is a bright and experienced web app developer with expertise in Groovy and Grails development.

Request for Proposal

Name is required

Comment is required

Sending message..