MMMapper 类代码:
package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MMMapper extends Mapper
System.out.println(outkey+" | "+outvalue);
}
col++;
}
arow++;
}else if ("matrixB".equals(tag)) {
int col = 0;
while (str.hasMoreTokens()) {
String item = str.nextToken();
for (int i = 0; i < crow; i++) {
Text outkey = new Text(i+","+col);
Text outvalue = new Text("b,"+brow+","+item);
context.write(outkey, outvalue);
System.out.println(outkey+" | "+outvalue);
//current x,y = line,col
}
col++;
}
brow++;
}
}
}
MMReducer 类代码:
package com;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MMReducer extends Reducer {
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
Map matrixa = new HashMap();
Map matrixb = new HashMap();
//values example : b,0,2
for (Text val : values) {
a,0,4
StringTokenizer str = new StringTokenizer(val.toString(),",");
String sourceMatrix = str.nextToken();
if ("a".equals(sourceMatrix)) {
matrixa.put(str.nextToken(), str.nextToken());
//(0,4)
or
}
if ("b".equals(sourceMatrix)) {
matrixb.put(str.nextToken(), str.nextToken());
//(0,2)
int result = 0;
Iterator iter = matrixa.keySet().iterator();
while (iter.hasNext()) {
String mapkey = iter.next();
+=
result
Integer.parseInt(matrixb.get(mapkey));
Integer.parseInt(matrixa.get(mapkey))
*
}
}
}
}
context.write(key, new Text(String.valueOf(result)));
}
Multiply 类代码:
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Multiply {
public static void main(String[] args) throws Exception {
// set configuration
Configuration conf = new Configuration();
// create job
Job job = new Job(conf,"MatrixMultiply");
job.setJarByClass(Multiply.class);
//
specify Mapper & Reducer
job.setMapperClass(MMMapper.class);
job.setReducerClass(MMReducer.class);
// specify output types of mapper and reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// specify input and output DIRECTORIES
Path inPathA = new Path(args[0]);
Path inPathB = new Path(args[1]);
Path outPath = new Path(args[2]);
FileInputFormat.addInputPath(job, inPathA);
FileInputFormat.addInputPath(job, inPathB);
FileOutputFormat.setOutputPath(job,outPath);
// delete output directory
try{
FileSystem hdfs = outPath.getFileSystem(conf);
if(hdfs.exists(outPath))
hdfs.delete(outPath);
hdfs.close();
} catch (Exception e){
e.printStackTrace();
return ;
}
run the job
//
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
输 入 输 出 路 径 : hdfs://master:9000/matrix/matrixA/matrixa.txt
hdfs://master:9000/matrix/matrixB/matrixb.txt hdfs://master:9000/matrix/out