my code stock.com

hadoop de join

Snippet options

Download: Download snippet as hadoop-join.java.
Copy snippet: For this you need a free my code stock.com account.
Embed code : You will find the embed code for this snippet at the end of the page, if you want to embed it into a website or a blog!

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
import org.apache.hadoop.mapred.lib.MultipleInputs;
// import org.apache.hadoop.mapred.JobClient;
// import org.apache.hadoop.mapred.JobConf;

public class Join extends Configured implements Tool {

    public static class JoinMapperA extends 
					MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
	
	public void map(LongWritable key, Text value, 
			OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

	    String line = value.toString();
	    String[] items = line.split("\t");
	    String categoryId = items[0];
	    String categoryName =items[1];
	    
	    output.collect(new Text(categoryId + "\t" + "1"),
			   new Text(categoryName));
	}
    }

    public static class JoinMapperB extends 
					MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
	
	public void map(LongWritable key, Text value, 
			OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

	    String line = value.toString();
	    String[] items = line.split("\t");
	    String id = items[0];
	    String categoryId = items[1];
	    
	    output.collect(new Text(categoryId + "\t" + "2"),
			   new Text(id));
	    
	}
    }

    public static class JoinReducer extends
					MapReduceBase implements Reducer<Text, Text, Text, Text> {
	
	public void reduce(Text key, Iterator<Text> values, 
			   OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
	    
	    String[] items = key.toString().split("\t");
	    String categoryId = items[0];
	    String flag = items[1];
	    String categoryName = null;

	    //if ( flag.equals("1") ) {
	    categoryName = values.next().toString();
		//}
	    
	    String id = null;
	    while ( values.hasNext() ) {
		id = values.next().toString();
		
		output.collect(new Text(id),
			       new Text(categoryId + "\t" + categoryName));
	    }

	}
    }

    public static class JoinComparator extends WritableComparator {

	public JoinComparator() { super(Text.class); }

	public int compare(WritableComparable w1, WritableComparable w2) {
	    Text t1 = (Text)w1;
	    Text t2 = (Text)w2;
	    String[] items1 = t1.toString().split("\t");
	    String[] items2 = t2.toString().split("\t");
	    return items1[0].compareTo(items2[0]);
	}
    }

    public int run(String args[]) throws Exception {
	JobConf conf = new JobConf(getConf(), Join.class);
	
	// input
	MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, JoinMapperA.class);
	MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, JoinMapperB.class);
	// output
	FileOutputFormat.setOutputPath(conf, new Path(args[2]));
	
	// map
	conf.setMapOutputKeyClass(Text.class);
	conf.setMapOutputValueClass(Text.class);
	conf.setOutputKeyClass(Text.class);
	conf.setOutputValueClass(Text.class);
	
	// partitioner
	conf.set("mapred.output.key.field.separator", "\t");
	conf.set("mapred.text.key.partitioner.options","-k1,1");
	conf.setPartitionerClass(KeyFieldBasedPartitioner.class);

	// reducer
	conf.setReducerClass(JoinReducer.class);
	
	JobClient.runJob(conf);
	return 0;
    }
    
    public static void main(String args[]) throws Exception {
	int exitCode = ToolRunner.run(new Configuration(),
				      new Join(), args);
	System.exit(exitCode);
    }
}

Create a free my code stock.com account now.

my code stok.com is a free service, which allows you to save and manage code snippes of any kind and programming language. We provide many advantages for your daily work with code-snippets, also for your teamwork. Give it a try!

Find out more and register now

You can customize the height of iFrame-Codes as needed! You can find more infos in our API Reference for iframe Embeds.