|
View:
New views
2 Messages
—
Rating Filter:
Alert me
|
|
|
output to file from table.Hi,
I'm processing the data from Hbase and, storing output in a SequenceFile. BTW, results are directly stored by Mapper. (not through reduce task) I couldn't find why reduce task didn't run. Could anyone advice to me? This is my code. ---- Job job = new Job(config, "Infinity Norm MR job : " + this.getPath()); Scan scan = new Scan(); scan.addFamily(Constants.COLUMNFAMILY); org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob( this.getPath(), scan, MatrixNormMapReduce.MatrixInfinityNormMapper.class, IntWritable.class, DoubleWritable.class, job); job.setReducerClass(MatrixNormMapReduce.MatrixInfinityNormReducer.class); job.setNumReduceTasks(1); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(DoubleWritable.class); SequenceFileOutputFormat.setOutputPath(job, outDir); job.waitForCompletion(true); ---- public static class MatrixInfinityNormMapper extends TableMapper<IntWritable, DoubleWritable> { @Override public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { double rowSum = 0; NavigableMap<byte[], byte[]> v = value .getFamilyMap(Constants.COLUMNFAMILY); for (Map.Entry<byte[], byte[]> e : v.entrySet()) { rowSum += Math.abs(BytesUtil.bytesToDouble(e.getValue())); } context.write(MatrixNormMapReduce.nKey, new DoubleWritable(rowSum)); } } ---- public static class MatrixInfinityNormReducer extends TableReducer<IntWritable, DoubleWritable, Writable> { static final Logger LOG = Logger.getLogger(MatrixInfinityNormReducer.class); private double max = 0; public void reduce(IntWritable key, Iterator<DoubleWritable> values, OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter) throws IOException { while (values.hasNext()) { LOG.info(">>>>>>> " + nKey + ", " + max); max = Math.max(values.next().get(), max); } // Note: Tricky here. As we known, we collect each row's sum with key(-1). // the reduce will just iterate through one key (-1) // so we collect the max sum-value here output.collect(MatrixNormMapReduce.nKey, new DoubleWritable(max)); } -- Best Regards, Edward J. Yoon @ NHN, corp. edwardyoon@... http://blog.udanax.org |
|
|
Re: output to file from table.After change a reduce code, my problem solved. Please ignore it.
On Wed, Oct 28, 2009 at 10:50 AM, Edward J. Yoon <edwardyoon@...> wrote: > Hi, > > I'm processing the data from Hbase and, storing output in a > SequenceFile. BTW, results are directly stored by Mapper. (not through > reduce task) I couldn't find why reduce task didn't run. > > Could anyone advice to me? > > This is my code. > ---- > Job job = new Job(config, "Infinity Norm MR job : " + this.getPath()); > Scan scan = new Scan(); > scan.addFamily(Constants.COLUMNFAMILY); > > org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob( > this.getPath(), scan, > MatrixNormMapReduce.MatrixInfinityNormMapper.class, IntWritable.class, > DoubleWritable.class, job); > > job.setReducerClass(MatrixNormMapReduce.MatrixInfinityNormReducer.class); > job.setNumReduceTasks(1); > job.setOutputFormatClass(SequenceFileOutputFormat.class); > job.setOutputKeyClass(IntWritable.class); > job.setOutputValueClass(DoubleWritable.class); > SequenceFileOutputFormat.setOutputPath(job, outDir); > > job.waitForCompletion(true); > > ---- > > public static class MatrixInfinityNormMapper extends > TableMapper<IntWritable, DoubleWritable> { > > @Override > public void map(ImmutableBytesWritable key, Result value, Context context) > throws IOException, InterruptedException { > > double rowSum = 0; > NavigableMap<byte[], byte[]> v = value > .getFamilyMap(Constants.COLUMNFAMILY); > for (Map.Entry<byte[], byte[]> e : v.entrySet()) { > rowSum += Math.abs(BytesUtil.bytesToDouble(e.getValue())); > } > > context.write(MatrixNormMapReduce.nKey, new DoubleWritable(rowSum)); > } > } > > ---- > public static class MatrixInfinityNormReducer extends > TableReducer<IntWritable, DoubleWritable, Writable> { > static final Logger LOG = Logger.getLogger(MatrixInfinityNormReducer.class); > private double max = 0; > > public void reduce(IntWritable key, Iterator<DoubleWritable> values, > OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter) > throws IOException { > while (values.hasNext()) { > LOG.info(">>>>>>> " + nKey + ", " + max); > max = Math.max(values.next().get(), max); > } > > // Note: Tricky here. As we known, we collect each row's sum with key(-1). > // the reduce will just iterate through one key (-1) > // so we collect the max sum-value here > output.collect(MatrixNormMapReduce.nKey, new DoubleWritable(max)); > } > > -- > Best Regards, Edward J. Yoon @ NHN, corp. > edwardyoon@... > http://blog.udanax.org > -- Best Regards, Edward J. Yoon @ NHN, corp. edwardyoon@... http://blog.udanax.org |
| Free embeddable forum powered by Nabble | Forum Help |