output to file from table.

View: New views
2 Messages — Rating Filter:   Alert me  

output to file from table.

by Edward J. Yoon-2 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,

I'm processing the data from Hbase and, storing output in a
SequenceFile. BTW, results are directly stored by Mapper. (not through
reduce task) I couldn't find why reduce task didn't run.

Could anyone advice to me?

This is my code.
----
    Job job = new Job(config, "Infinity Norm MR job : " + this.getPath());
    Scan scan = new Scan();
    scan.addFamily(Constants.COLUMNFAMILY);

    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(
        this.getPath(), scan,
MatrixNormMapReduce.MatrixInfinityNormMapper.class, IntWritable.class,
        DoubleWritable.class, job);

    job.setReducerClass(MatrixNormMapReduce.MatrixInfinityNormReducer.class);
    job.setNumReduceTasks(1);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(DoubleWritable.class);
    SequenceFileOutputFormat.setOutputPath(job, outDir);

    job.waitForCompletion(true);

----

  public static class MatrixInfinityNormMapper extends
      TableMapper<IntWritable, DoubleWritable> {

    @Override
    public void map(ImmutableBytesWritable key, Result value, Context context)
        throws IOException, InterruptedException {

      double rowSum = 0;
      NavigableMap<byte[], byte[]> v = value
          .getFamilyMap(Constants.COLUMNFAMILY);
      for (Map.Entry<byte[], byte[]> e : v.entrySet()) {
        rowSum += Math.abs(BytesUtil.bytesToDouble(e.getValue()));
      }

      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(rowSum));
    }
  }

----
  public static class MatrixInfinityNormReducer extends
      TableReducer<IntWritable, DoubleWritable, Writable> {
    static final Logger LOG = Logger.getLogger(MatrixInfinityNormReducer.class);
    private double max = 0;

    public void reduce(IntWritable key, Iterator<DoubleWritable> values,
        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
        throws IOException {
      while (values.hasNext()) {
        LOG.info(">>>>>>> " + nKey + ", " + max);
        max = Math.max(values.next().get(), max);
      }

      // Note: Tricky here. As we known, we collect each row's sum with key(-1).
      // the reduce will just iterate through one key (-1)
      // so we collect the max sum-value here
      output.collect(MatrixNormMapReduce.nKey, new DoubleWritable(max));
    }

--
Best Regards, Edward J. Yoon @ NHN, corp.
edwardyoon@...
http://blog.udanax.org

Re: output to file from table.

by Edward J. Yoon-2 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

After change a reduce code, my problem solved. Please ignore it.

On Wed, Oct 28, 2009 at 10:50 AM, Edward J. Yoon <edwardyoon@...> wrote:

> Hi,
>
> I'm processing the data from Hbase and, storing output in a
> SequenceFile. BTW, results are directly stored by Mapper. (not through
> reduce task) I couldn't find why reduce task didn't run.
>
> Could anyone advice to me?
>
> This is my code.
> ----
>    Job job = new Job(config, "Infinity Norm MR job : " + this.getPath());
>    Scan scan = new Scan();
>    scan.addFamily(Constants.COLUMNFAMILY);
>
>    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(
>        this.getPath(), scan,
> MatrixNormMapReduce.MatrixInfinityNormMapper.class, IntWritable.class,
>        DoubleWritable.class, job);
>
>    job.setReducerClass(MatrixNormMapReduce.MatrixInfinityNormReducer.class);
>    job.setNumReduceTasks(1);
>    job.setOutputFormatClass(SequenceFileOutputFormat.class);
>    job.setOutputKeyClass(IntWritable.class);
>    job.setOutputValueClass(DoubleWritable.class);
>    SequenceFileOutputFormat.setOutputPath(job, outDir);
>
>    job.waitForCompletion(true);
>
> ----
>
>  public static class MatrixInfinityNormMapper extends
>      TableMapper<IntWritable, DoubleWritable> {
>
>    @Override
>    public void map(ImmutableBytesWritable key, Result value, Context context)
>        throws IOException, InterruptedException {
>
>      double rowSum = 0;
>      NavigableMap<byte[], byte[]> v = value
>          .getFamilyMap(Constants.COLUMNFAMILY);
>      for (Map.Entry<byte[], byte[]> e : v.entrySet()) {
>        rowSum += Math.abs(BytesUtil.bytesToDouble(e.getValue()));
>      }
>
>      context.write(MatrixNormMapReduce.nKey, new DoubleWritable(rowSum));
>    }
>  }
>
> ----
>  public static class MatrixInfinityNormReducer extends
>      TableReducer<IntWritable, DoubleWritable, Writable> {
>    static final Logger LOG = Logger.getLogger(MatrixInfinityNormReducer.class);
>    private double max = 0;
>
>    public void reduce(IntWritable key, Iterator<DoubleWritable> values,
>        OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
>        throws IOException {
>      while (values.hasNext()) {
>        LOG.info(">>>>>>> " + nKey + ", " + max);
>        max = Math.max(values.next().get(), max);
>      }
>
>      // Note: Tricky here. As we known, we collect each row's sum with key(-1).
>      // the reduce will just iterate through one key (-1)
>      // so we collect the max sum-value here
>      output.collect(MatrixNormMapReduce.nKey, new DoubleWritable(max));
>    }
>
> --
> Best Regards, Edward J. Yoon @ NHN, corp.
> edwardyoon@...
> http://blog.udanax.org
>



--
Best Regards, Edward J. Yoon @ NHN, corp.
edwardyoon@...
http://blog.udanax.org