logo资料库

基于MapReduce的矩阵相乘算法代码及其使用.docx

第1页 / 共4页
第2页 / 共4页
第3页 / 共4页
第4页 / 共4页
资料共4页,全文预览结束
MMMapper 类代码: package com; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class MMMapper extends Mapper { private String tag; //current matrix private int crow =4 ;// 矩阵 A 的行数 private int ccol =4 ;// 矩阵 B 的列数 private static int arow = 0; //current arow private static int brow = 0; //current brow @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO get inputpath of input data, set to tag FileSplit fs = (FileSplit)context.getInputSplit(); tag = fs.getPath().getParent().getName(); } /** * input data include two matrix files */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer str = new StringTokenizer(value.toString()); //left matrix,output key:x,y if ("matrixA".equals(tag)) { int col = 0; while (str.hasMoreTokens()) { String item = str.nextToken(); for (int i = 0; i < ccol; i++) { Text outkey = new Text(arow+","+i); Text outvalue = new Text("a,"+col+","+item); context.write(outkey, outvalue); //current x,y = line,col
System.out.println(outkey+" | "+outvalue); } col++; } arow++; }else if ("matrixB".equals(tag)) { int col = 0; while (str.hasMoreTokens()) { String item = str.nextToken(); for (int i = 0; i < crow; i++) { Text outkey = new Text(i+","+col); Text outvalue = new Text("b,"+brow+","+item); context.write(outkey, outvalue); System.out.println(outkey+" | "+outvalue); //current x,y = line,col } col++; } brow++; } } } MMReducer 类代码: package com; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.StringTokenizer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MMReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Map matrixa = new HashMap();
Map matrixb = new HashMap(); //values example : b,0,2 for (Text val : values) { a,0,4 StringTokenizer str = new StringTokenizer(val.toString(),","); String sourceMatrix = str.nextToken(); if ("a".equals(sourceMatrix)) { matrixa.put(str.nextToken(), str.nextToken()); //(0,4) or } if ("b".equals(sourceMatrix)) { matrixb.put(str.nextToken(), str.nextToken()); //(0,2) int result = 0; Iterator iter = matrixa.keySet().iterator(); while (iter.hasNext()) { String mapkey = iter.next(); += result Integer.parseInt(matrixb.get(mapkey)); Integer.parseInt(matrixa.get(mapkey)) * } } } } context.write(key, new Text(String.valueOf(result))); } Multiply 类代码: package com; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Multiply { public static void main(String[] args) throws Exception { // set configuration Configuration conf = new Configuration();
// create job Job job = new Job(conf,"MatrixMultiply"); job.setJarByClass(Multiply.class); // specify Mapper & Reducer job.setMapperClass(MMMapper.class); job.setReducerClass(MMReducer.class); // specify output types of mapper and reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // specify input and output DIRECTORIES Path inPathA = new Path(args[0]); Path inPathB = new Path(args[1]); Path outPath = new Path(args[2]); FileInputFormat.addInputPath(job, inPathA); FileInputFormat.addInputPath(job, inPathB); FileOutputFormat.setOutputPath(job,outPath); // delete output directory try{ FileSystem hdfs = outPath.getFileSystem(conf); if(hdfs.exists(outPath)) hdfs.delete(outPath); hdfs.close(); } catch (Exception e){ e.printStackTrace(); return ; } run the job // System.exit(job.waitForCompletion(true) ? 0 : 1); } } 输 入 输 出 路 径 : hdfs://master:9000/matrix/matrixA/matrixa.txt hdfs://master:9000/matrix/matrixB/matrixb.txt hdfs://master:9000/matrix/out
分享到:
收藏