composite key, chain map reduce예제
map reduce의 기본인 word count예제의 결과물을 다시 한번 가공하여,
각 단어의 첫번째 알파벳에 해당되는 단어를 빈도수에 따라 다시 정렬해보자.
가령.. "my name is jack. your name is heidi" 라는 문장이 있을때,
> 첫번째 map reduce를 통과한 결과
is 2
jack 1
my 1
heidi 1
name 2
your 1
> 두번째 map reduce를 통과한 결과
i is
j jack
m my
n name,heidi
y your
이런 결과가 나오면 된다.
첫번째 결과값으로 두번째 map을 구성할때, "key={첫번째 알파벳, 빈도수}, value=단어" 이런 composite key를 생성하는것이 포인트
이하 샘플소스
----
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static class Map2 extends Mapper<LongWritable, Text, WCKey, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] sp = line.split("\t");
String str = sp[0];
int cnt = Integer.parseInt(sp[1]);
WCKey wcKey = new WCKey(String.valueOf(str.charAt(0)), cnt);
context.write(wcKey, new Text(str));
}
}
public static class Reduce2 extends Reducer<WCKey, Text, Text, Text> {
public void reduce(WCKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Iterator<Text> it = values.iterator();
StringBuffer sb = new StringBuffer();
while(it.hasNext()) {
sb.append(it.next().toString()).append("("+String.valueOf(key.getCnt())+") ");
}
context.write(new Text(key.getCh()), new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
// first job
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "WordCount");
job1.setJarByClass(WordCount.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(TextOutputFormat.class);
job1.setMapperClass(Map.class);
job1.setReducerClass(Reduce.class);
FileInputFormat.addInputPath(job1, new Path("/Users/jackkim/temp/dataset/"));
FileOutputFormat.setOutputPath(job1, new Path("/Users/jackkim/temp/temp/"));
// job1.waitForCompletion(true);
ControlledJob cJob1 = new ControlledJob(conf1);
cJob1.setJob(job1);
// second job
Configuration conf2 = new Configuration();
Job job2 = new Job(conf2, "WordCount2");
job2.setJarByClass(WordCount.class);
job2.setPartitionerClass(NaturalKeyPartitioner.class);
job2.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
job2.setSortComparatorClass(CompositeKeyComparator.class);
job2.setMapOutputKeyClass(WCKey.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setMapperClass(Map2.class);
job2.setReducerClass(Reduce2.class);
FileInputFormat.addInputPath(job2, new Path("/Users/jackkim/temp/temp/part*"));
FileOutputFormat.setOutputPath(job2, new Path("/Users/jackkim/temp/output/"));
// job2.waitForCompletion(true);
ControlledJob cJob2 = new ControlledJob(conf2);
cJob2.setJob(job2);
JobControl jobctrl = new JobControl("jobctrl");
jobctrl.addJob(cJob1);
jobctrl.addJob(cJob2);
cJob2.addDependingJob(cJob1);
// jobctrl.run();
Thread theController = new Thread(jobctrl);
theController.start();
// while (!jobctrl.allFinished()) {
// System.out.println("Jobs in waiting state: " + jobctrl.getWaitingJobList().size());
// System.out.println("Jobs in ready state: " + jobctrl.getReadyJobsList().size());
// System.out.println("Jobs in running state: " + jobctrl.getRunningJobList().size());
// System.out.println("Jobs in success state: " + jobctrl.getSuccessfulJobList().size());
// System.out.println("Jobs in failed state: " + jobctrl.getFailedJobList().size());
// System.out.println("\n");
// //sleep 5 seconds
// try {
// Thread.sleep(5000);
// } catch (Exception e) { }
// }
}
public static class WCKey implements WritableComparable<WCKey> {
private String ch;
private int cnt;
public WCKey() { }
public WCKey(String ch, int cnt) {
this.ch = ch;
this.cnt = cnt;
}
@Override
public String toString() {
return "character="+this.ch+",count="+String.valueOf(this.cnt);
}
@Override
public int compareTo(WCKey o) {
int result = this.ch.compareTo(o.ch);
if(0 == result) {
Integer i1 = new Integer(this.cnt);
Integer i2 = new Integer(o.cnt);
result = i1.compareTo(i2);
}
return result;
}
@Override
public void readFields(DataInput in) throws IOException {
this.ch = WritableUtils.readString(in);
this.cnt = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, this.ch);
out.writeInt(this.cnt);
}
public String getCh() {
return ch;
}
public int getCnt() {
return cnt;
}
public void setCh(String ch) {
this.ch = ch;
}
public void setCnt(int cnt) {
this.cnt = cnt;
}
}
public static class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(WCKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
WCKey k1 = (WCKey)w1;
WCKey k2 = (WCKey)w2;
int result = k1.getCh().compareTo(k2.getCh());
if(0 == result) {
Integer i1 = new Integer(k1.getCnt());
Integer i2 = new Integer(k2.getCnt());
result = -1*i1.compareTo(i2);
}
return result;
}
}
public static class NaturalKeyPartitioner extends Partitioner<WCKey, Text> {
@Override
public int getPartition(WCKey key, Text val, int numPartitions) {
int hash = key.getCh().hashCode();
int partition = hash % numPartitions;
return partition;
}
}
public static class NaturalKeyGroupingComparator extends WritableComparator {
protected NaturalKeyGroupingComparator() {
super(WCKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
WCKey k1 = (WCKey)w1;
WCKey k2 = (WCKey)w2;
return k1.getCh().compareTo(k2.getCh());
}
}
}