개발/빅데이타

composite key, chain map reduce예제

JackSama 2013. 1. 16. 11:22

map reduce의 기본인 word count예제의 결과물을 다시 한번 가공하여,

각 단어의 첫번째 알파벳에 해당되는 단어를 빈도수에 따라 다시 정렬해보자.


가령.. "my name is jack. your name is heidi" 라는 문장이 있을때,

> 첫번째 map reduce를 통과한 결과

is    2

jack    1

my    1

heidi    1

name    2

your    1


> 두번째 map reduce를 통과한 결과

i    is

j    jack

m    my

n    name,heidi

y    your

이런 결과가 나오면 된다.


첫번째 결과값으로 두번째 map을 구성할때, "key={첫번째 알파벳, 빈도수}, value=단어" 이런 composite key를 생성하는것이 포인트


이하 샘플소스

----

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();


public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}


public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {


public void reduce(Text key, Iterable<IntWritable> values, Context context) 

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

context.write(key, new IntWritable(sum));

}

}


public static class Map2 extends Mapper<LongWritable, Text, WCKey, Text> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] sp = line.split("\t");

String str = sp[0];

int cnt = Integer.parseInt(sp[1]);


WCKey wcKey = new WCKey(String.valueOf(str.charAt(0)), cnt);

context.write(wcKey, new Text(str));

}


public static class Reduce2 extends Reducer<WCKey, Text, Text, Text> {

public void reduce(WCKey key, Iterable<Text> values, Context context) 

throws IOException, InterruptedException {

Iterator<Text> it = values.iterator();

StringBuffer sb = new StringBuffer();


while(it.hasNext()) {

sb.append(it.next().toString()).append("("+String.valueOf(key.getCnt())+") ");

}


context.write(new Text(key.getCh()), new Text(sb.toString()));

}

}


public static void main(String[] args) throws Exception {

// first job

Configuration conf1 = new Configuration();


Job job1 = new Job(conf1, "WordCount");

job1.setJarByClass(WordCount.class);


job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(IntWritable.class);


job1.setInputFormatClass(TextInputFormat.class);

job1.setOutputFormatClass(TextOutputFormat.class);


job1.setMapperClass(Map.class);

job1.setReducerClass(Reduce.class);


FileInputFormat.addInputPath(job1, new Path("/Users/jackkim/temp/dataset/"));

FileOutputFormat.setOutputPath(job1, new Path("/Users/jackkim/temp/temp/"));


// job1.waitForCompletion(true);

ControlledJob cJob1 = new ControlledJob(conf1);

cJob1.setJob(job1);


// second job

Configuration conf2 = new Configuration();


Job job2 = new Job(conf2, "WordCount2");

job2.setJarByClass(WordCount.class);


job2.setPartitionerClass(NaturalKeyPartitioner.class);

job2.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);

job2.setSortComparatorClass(CompositeKeyComparator.class);


job2.setMapOutputKeyClass(WCKey.class);

job2.setMapOutputValueClass(Text.class);


job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(Text.class);


job2.setInputFormatClass(TextInputFormat.class);

job2.setOutputFormatClass(TextOutputFormat.class);


job2.setMapperClass(Map2.class);

job2.setReducerClass(Reduce2.class);


FileInputFormat.addInputPath(job2, new Path("/Users/jackkim/temp/temp/part*"));

FileOutputFormat.setOutputPath(job2, new Path("/Users/jackkim/temp/output/"));


// job2.waitForCompletion(true);

ControlledJob cJob2 = new ControlledJob(conf2);

cJob2.setJob(job2);



JobControl jobctrl = new JobControl("jobctrl");

jobctrl.addJob(cJob1);

jobctrl.addJob(cJob2);

cJob2.addDependingJob(cJob1);

//     jobctrl.run();


Thread theController = new Thread(jobctrl);

theController.start();


// while (!jobctrl.allFinished()) {

// System.out.println("Jobs in waiting state: " + jobctrl.getWaitingJobList().size());

// System.out.println("Jobs in ready state: " + jobctrl.getReadyJobsList().size());

// System.out.println("Jobs in running state: " + jobctrl.getRunningJobList().size());

// System.out.println("Jobs in success state: " + jobctrl.getSuccessfulJobList().size());

// System.out.println("Jobs in failed state: " + jobctrl.getFailedJobList().size());

// System.out.println("\n");

// //sleep 5 seconds

// try {

// Thread.sleep(5000);

// } catch (Exception e) {  }

// }

}


public static class WCKey implements WritableComparable<WCKey> {

private String ch;

private int cnt;


public WCKey() { }


public WCKey(String ch, int cnt) {

this.ch = ch;

this.cnt = cnt;

}


@Override

public String toString() {

return "character="+this.ch+",count="+String.valueOf(this.cnt);

}


@Override

public int compareTo(WCKey o) {

int result = this.ch.compareTo(o.ch);

if(0 == result) {

Integer i1 = new Integer(this.cnt);

Integer i2 = new Integer(o.cnt);

result = i1.compareTo(i2);

}

return result;

}


@Override

public void readFields(DataInput in) throws IOException {

this.ch = WritableUtils.readString(in);

this.cnt = in.readInt();

}


@Override

public void write(DataOutput out) throws IOException {

WritableUtils.writeString(out, this.ch);

out.writeInt(this.cnt);

}


public String getCh() {

return ch;

}


public int getCnt() {

return cnt;

}


public void setCh(String ch) {

this.ch = ch;

}


public void setCnt(int cnt) {

this.cnt = cnt;

}

}


public static class CompositeKeyComparator extends WritableComparator {

protected CompositeKeyComparator() {

super(WCKey.class, true);

}


@SuppressWarnings("rawtypes")

@Override

public int compare(WritableComparable w1, WritableComparable w2) {

WCKey k1 = (WCKey)w1;

WCKey k2 = (WCKey)w2;


int result = k1.getCh().compareTo(k2.getCh());

if(0 == result) {

Integer i1 = new Integer(k1.getCnt());

Integer i2 = new Integer(k2.getCnt());

result = -1*i1.compareTo(i2);

}


return result;

}

}


public static class NaturalKeyPartitioner extends Partitioner<WCKey, Text> {

@Override

public int getPartition(WCKey key, Text val, int numPartitions) {

int hash = key.getCh().hashCode();

int partition = hash % numPartitions;

return partition;

}

}


public static class NaturalKeyGroupingComparator extends WritableComparator {

protected NaturalKeyGroupingComparator() {

super(WCKey.class, true);

}


@SuppressWarnings("rawtypes")

@Override

public int compare(WritableComparable w1, WritableComparable w2) {

WCKey k1 = (WCKey)w1;

WCKey k2 = (WCKey)w2;


return k1.getCh().compareTo(k2.getCh());

}

}

}