Error reporting from map function

View: New views
16 Messages — Rating Filter:   Alert me  

Error reporting from map function

by ojh06 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,

Apologies for yet another question from me, but here goes!

I've written a map task that will on occasion not compute the correct  
result. This can easily be detected, at which point I'd like the map  
task to report the error and terminate the entire map/reduce job. Does  
anyone know of a way I can do this?

I've been looking around the archives and the api, and the only thing  
that comes close is the reporter class, but I can't I think that only  
reports stuff and doesn't actually allow control of the job?

Any help much appreciated as ever,

Cheers,
Ollie


Re: Error reporting from map function

by Anthony D. Urso :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Call JobConf.setMaxMapAttempts(0) in the job conf, then throw an exception
when your mapper fails.  This should kill the entire job instantly, since
the job tracker will allow no mapper failures.

Cheers,
Anthony

On Mon, Jul 30, 2007 at 09:42:09PM +0100, ojh06@... wrote:

> Hi,
>
> Apologies for yet another question from me, but here goes!
>
> I've written a map task that will on occasion not compute the correct  
> result. This can easily be detected, at which point I'd like the map  
> task to report the error and terminate the entire map/reduce job. Does  
> anyone know of a way I can do this?
>
> I've been looking around the archives and the api, and the only thing  
> that comes close is the reporter class, but I can't I think that only  
> reports stuff and doesn't actually allow control of the job?
>
> Any help much appreciated as ever,
>
> Cheers,
> Ollie
>

--
 Au

 PGP Key ID: 0x385B44CB
 Fingerprint: 9E9E B116 DB2C D734 C090  E72F 43A0 95C4 385B 44CB
    "Maximus vero fugiens a quodam Urso, milite Romano, interemptus est"
                                               - Getica 235

Re: Error reporting from map function

by ojh06 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Thanks Anthony, its good to know it can be done! However, I was hoping  
to be able to report the numerical error in my map function. With the  
the way you suggest would there be any way to access the exception  
thrown? I'm running the map-reduce job from a gui, so would rather  
have an error box come up than just have an exception appear on the  
command line. I'd also like to be able to differentiate between a job  
that fails because of this numerical error in the map task and a job  
that fails because, say, the namenode crashes.

Thanks,
Ollie

Quoting "Anthony D. Urso" <anthonyu@...>:

> Call JobConf.setMaxMapAttempts(0) in the job conf, then throw an exception
> when your mapper fails.  This should kill the entire job instantly, since
> the job tracker will allow no mapper failures.
>
> Cheers,
> Anthony
>
> On Mon, Jul 30, 2007 at 09:42:09PM +0100, ojh06@... wrote:
>> Hi,
>>
>> Apologies for yet another question from me, but here goes!
>>
>> I've written a map task that will on occasion not compute the correct
>> result. This can easily be detected, at which point I'd like the map
>> task to report the error and terminate the entire map/reduce job. Does
>> anyone know of a way I can do this?
>>
>> I've been looking around the archives and the api, and the only thing
>> that comes close is the reporter class, but I can't I think that only
>> reports stuff and doesn't actually allow control of the job?
>>
>> Any help much appreciated as ever,
>>
>> Cheers,
>> Ollie
>>
>
> --
>  Au
>
>  PGP Key ID: 0x385B44CB
>  Fingerprint: 9E9E B116 DB2C D734 C090  E72F 43A0 95C4 385B 44CB
>     "Maximus vero fugiens a quodam Urso, milite Romano, interemptus est"
>                                                - Getica 235
>




Re: Error reporting from map function

by Anthony D. Urso :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

In that case, it would be better to communicate the error in-band somehow
before throwing the exception.

On Tue, Jul 31, 2007 at 12:09:27AM +0100, ojh06@... wrote:

> Thanks Anthony, its good to know it can be done! However, I was hoping  
> to be able to report the numerical error in my map function. With the  
> the way you suggest would there be any way to access the exception  
> thrown? I'm running the map-reduce job from a gui, so would rather  
> have an error box come up than just have an exception appear on the  
> command line. I'd also like to be able to differentiate between a job  
> that fails because of this numerical error in the map task and a job  
> that fails because, say, the namenode crashes.
>
> Thanks,
> Ollie
>
> Quoting "Anthony D. Urso" <anthonyu@...>:
>
> >Call JobConf.setMaxMapAttempts(0) in the job conf, then throw an exception
> >when your mapper fails.  This should kill the entire job instantly, since
> >the job tracker will allow no mapper failures.
> >
> >Cheers,
> >Anthony
> >
> >On Mon, Jul 30, 2007 at 09:42:09PM +0100, ojh06@... wrote:
> >>Hi,
> >>
> >>Apologies for yet another question from me, but here goes!
> >>
> >>I've written a map task that will on occasion not compute the correct
> >>result. This can easily be detected, at which point I'd like the map
> >>task to report the error and terminate the entire map/reduce job. Does
> >>anyone know of a way I can do this?
> >>
> >>I've been looking around the archives and the api, and the only thing
> >>that comes close is the reporter class, but I can't I think that only
> >>reports stuff and doesn't actually allow control of the job?
> >>
> >>Any help much appreciated as ever,
> >>
> >>Cheers,
> >>Ollie
> >>
> >
> >--
> > Au
> >
> > PGP Key ID: 0x385B44CB
> > Fingerprint: 9E9E B116 DB2C D734 C090  E72F 43A0 95C4 385B 44CB
> >    "Maximus vero fugiens a quodam Urso, milite Romano, interemptus est"
> >                                               - Getica 235
> >
>
>
>

--
 Au

 PGP Key ID: 0x385B44CB
 Fingerprint: 9E9E B116 DB2C D734 C090  E72F 43A0 95C4 385B 44CB
    "Maximus vero fugiens a quodam Urso, milite Romano, interemptus est"
                                               - Getica 235

Re: Error reporting from map function

by Jeroen Verhagen :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,

On 7/30/07, Anthony D. Urso <anthonyu@...> wrote:
> Call JobConf.setMaxMapAttempts(0) in the job conf, then throw an exception
> when your mapper fails.  This should kill the entire job instantly, since
> the job tracker will allow no mapper failures.

Wouldn't this cause all other running and future jobs to stop
attempting to recover from an error? Or do all jobs have copies of the
original job conf?

--

regards,

Jeroen

Re: Error reporting from map function

by ojh06 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Well, I don't think it will be too much of a problem for me. I'll only  
be running this one type of job. The problem I have is that I can only  
throw IOExceptions out of the Mapper function. So if a job fails for  
some other reason, other than my numerical calculation error I have no  
way of knowing. I'd like to retry if its a communication problem, but  
terminate if its a calculation problem within my function.

I'm getting the feeling this isn't possible?

Quoting Jeroen Verhagen <jeroenverhagen@...>:

> Hi,
>
> On 7/30/07, Anthony D. Urso <anthonyu@...> wrote:
>> Call JobConf.setMaxMapAttempts(0) in the job conf, then throw an exception
>> when your mapper fails.  This should kill the entire job instantly, since
>> the job tracker will allow no mapper failures.
>
> Wouldn't this cause all other running and future jobs to stop
> attempting to recover from an error? Or do all jobs have copies of the
> original job conf?
>
> --
>
> regards,
>
> Jeroen
>




Re: Error reporting from map function

by Doug Cutting-4 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

ojh06@... wrote:
> I've written a map task that will on occasion not compute the correct
> result. This can easily be detected, at which point I'd like the map
> task to report the error and terminate the entire map/reduce job. Does
> anyone know of a way I can do this?

You can easily kill the job from a map task.  Just use the mapred.job.id
job property to get the job id, then use JobClient to kill the job.
Reporting the error could be done by setting the task's state in the
reporter, and then scanning task reports from your job client after the
job is killed for such state strings.  Or you could perhaps just set a
counter on the reporter in the map task, and then checking that counter
on the RunningJob, so that you don't have to scan all the tasks.  You
might need to sleep a few seconds after setting the state or counter
before killing the job, so that these reports have a chance to make it
back to the jobtracker.

Doug

Re: Error reporting from map function

by ojh06 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi Doug,

Thanks for the reply. Could you possibly explain how my program would  
get access to the task reports from each tracker? I've found the  
getMapTaskReports method in the JobClient class, but can't work out  
how to access it other than by creating a new instance of JobClient -  
but then that JobClient would be a differnt one to the one that was  
running my job, so would access a different set of TaskReports?
Cheers,
Ollie

Quoting Doug Cutting <cutting@...>:

> ojh06@... wrote:
>> I've written a map task that will on occasion not compute the  
>> correct result. This can easily be detected, at which point I'd  
>> like the map task to report the error and terminate the entire  
>> map/reduce job. Does anyone know of a way I can do this?
>
> You can easily kill the job from a map task.  Just use the
> mapred.job.id job property to get the job id, then use JobClient to
> kill the job. Reporting the error could be done by setting the task's
> state in the reporter, and then scanning task reports from your job
> client after the job is killed for such state strings.  Or you could
> perhaps just set a counter on the reporter in the map task, and then
> checking that counter on the RunningJob, so that you don't have to scan
> all the tasks.  You might need to sleep a few seconds after setting the
> state or counter before killing the job, so that these reports have a
> chance to make it back to the jobtracker.
>
> Doug




Re: Error reporting from map function

by Michael Bieniosek :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message


On 8/2/07 5:20 AM, "ojh06@..." <ojh06@...> wrote:

> I've found the  
> getMapTaskReports method in the JobClient class, but can't work out
> how to access it other than by creating a new instance of JobClient -
> but then that JobClient would be a differnt one to the one that was
> running my job, so would access a different set of TaskReports?

That doesn't matter -- jobs are bound to jobtrackers, not to jobclients.
You can create a new JobClient and access all the jobs that the jobtracker
knows about.

-Michael


Re: Error reporting from map function

by ojh06 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi Michael,
Thanks for the reply. I've tried to write some code to do this now but  
its not working. I was wondering if there's anything obviously wrong?  
After my runJob() I put (just as a test):

JobClient aJC = new JobClient();
String jobid = jobConf.get("mapred.job.id");
aJC.setConf(jobConf); //I've tried with and without this line
TaskReport [] treps = aJC.getMapTaskReports(jobid);
for(TaskReport trep : treps) {
    System.out.println(trep.getState());
}

However, when I run it, I get a NullPointerException on the  
aJC.getMapTasksReports() line. I know its getting the correct jobid.

I've also tried similar code in my Map function for killing the class,  
but I get a NullPointerException when I try and do:
RunningJob rj  = aJC.getJob(jobid);

I'm thinking the new JobClient class needs to be connected to the  
JobTracker in some way? If so, could someone explain how this is done?  
Or am I way off?

Thanks,
Ollie

Quoting Michael Bieniosek <michael@...>:

>
> On 8/2/07 5:20 AM, "ojh06@..." <ojh06@...> wrote:
>
>> I've found the
>> getMapTaskReports method in the JobClient class, but can't work out
>> how to access it other than by creating a new instance of JobClient -
>> but then that JobClient would be a differnt one to the one that was
>> running my job, so would access a different set of TaskReports?
>
> That doesn't matter -- jobs are bound to jobtrackers, not to jobclients.
> You can create a new JobClient and access all the jobs that the jobtracker
> knows about.
>
> -Michael
>
>




Re: Error reporting from map function

by ojh06 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,
Well I've figured some of it out. I needed to initialise the new  
JobClient after setting its configuration. So the code looks like:
JobClient aJC = new JobClient();
String jobid = jobConf.get("mapred.job.id");
aJC.setConf(jobConf);
aJC.init();
This works fine in the Map function for killing the job. However, I'm  
still getting an exception in the launch function, on the  
getMapTasksReports() line, albeit at a deeper level. THe exception I  
get is:
org.apache.hadoop.ipc.RemoteException: java.io.IOException:  
java.lang.NullPointerException
         at java.util.TreeMap.compare(TreeMap.java:1093)
         at java.util.TreeMap.getEntry(TreeMap.java:347)
         at java.util.TreeMap.get(TreeMap.java:265)
         at  
org.apache.hadoop.mapred.JobTracker.getMapTaskReports(JobTracker.java:1522)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at  
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
         at  
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
         at java.lang.reflect.Method.invoke(Method.java:585)
         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:341)
         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:573)

         at org.apache.hadoop.ipc.Client.call(Client.java:471)
         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:165)
         at $Proxy1.getMapTaskReports(Unknown Source)
         at  
org.apache.hadoop.mapred.JobClient.getMapTaskReports(JobClient.java:505)
         at imperial.oliverhaggarty.RTAMapRed1a.launch(RTAMapRed1a.java:344)
         at imperial.oliverhaggarty.RTAMapRed1a.main(RTAMapRed1a.java:695)
I've called runJob() in a separate thread, as I thought once the job  
had finished the set of TaskReports might get deleted. I've also put a  
sleep in before I start checking for TaskReports to give time for  
everything to initialise. I've copied the code below. Hopefully one of  
you Hadoop guru's can point me in the right direction!

Cheers,
Ollie

Code:
                Thread t = new Thread() {
                public void run() {
                System.out.println("In thread");
                try {
                JobClient.runJob(jobConf);
                }
                catch(IOException e) {
                e.printStackTrace();
                }
                }
                };
                t.start();
                System.out.println("Afterrunningthread");
                try {
                Thread.sleep(20000);
                }
                catch(InterruptedException e) {
                e.printStackTrace();
                }
                JobClient aJC = new JobClient();
                String jobid = jobConf.get("mapred.job.id");
                aJC.setConf(jobConf);
                aJC.init();
                TaskReport [] treps = null;
                do {
                        try {
/*GetExcetpion here-->*/       treps = aJC.getMapTaskReports(jobid);
                        }
                        catch(Exception e) {
                        e.printStackTrace();
                        break;
                        }
                        for(TaskReport trep : treps) {
                        System.out.println(trep.getState());
                        }
                        try {
                        Thread.sleep(1000);
                        }
                        catch (InterruptedException e) {
                        e.printStackTrace();
                        }
                }
                while(treps != null && treps.length > 0);

Quoting ojh06@...:

> Hi Michael,
> Thanks for the reply. I've tried to write some code to do this now but
> its not working. I was wondering if there's anything obviously wrong?
> After my runJob() I put (just as a test):
>
> JobClient aJC = new JobClient();
> String jobid = jobConf.get("mapred.job.id");
> aJC.setConf(jobConf); //I've tried with and without this line
> TaskReport [] treps = aJC.getMapTaskReports(jobid);
> for(TaskReport trep : treps) {
>    System.out.println(trep.getState());
> }
>
> However, when I run it, I get a NullPointerException on the
> aJC.getMapTasksReports() line. I know its getting the correct jobid.
>
> I've also tried similar code in my Map function for killing the class,
> but I get a NullPointerException when I try and do:
> RunningJob rj  = aJC.getJob(jobid);
>
> I'm thinking the new JobClient class needs to be connected to the
> JobTracker in some way? If so, could someone explain how this is done?
> Or am I way off?
>
> Thanks,
> Ollie
>
> Quoting Michael Bieniosek <michael@...>:
>
>>
>> On 8/2/07 5:20 AM, "ojh06@..." <ojh06@...> wrote:
>>
>>> I've found the
>>> getMapTaskReports method in the JobClient class, but can't work out
>>> how to access it other than by creating a new instance of JobClient -
>>> but then that JobClient would be a differnt one to the one that was
>>> running my job, so would access a different set of TaskReports?
>>
>> That doesn't matter -- jobs are bound to jobtrackers, not to jobclients.
>> You can create a new JobClient and access all the jobs that the jobtracker
>> knows about.
>>
>> -Michael
>>
>>




HDFS Question re adding additional storage

by C G-4 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Is it possible to additional space to HDFS (in the form of new datanodes) with minimal/no fuss?  In other words, if I have 8T across 16 machines, and I want to go to 16T across 32 machines, can I roll in new machines easily, or do I need to plan considerable downtime to rebuild things and move data around?
   
  There are obvious implications here for how big an initial system to build, and the costs associated with buying now and buying later.
   
  Thanks,
  C G
   

       
---------------------------------
Got a little couch potato?
Check out fun summer activities for kids.
       
---------------------------------
Got a little couch potato?
Check out fun summer activities for kids.

RE: HDFS Question re adding additional storage

by Dhruba Borthakur :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

It is possible to roll in additional nodes into the cluster anytime you
want. Not much complexity in that.

However, existing 0.14 hadoop release will not rebalance data across these
new nodes. What that means is that the new nodes will be relatively empty
till new data arrives into the cluster. It might take a while for the new
nodes to get filled up.

Work is in progress to facilitate cluster-data rebalance when new Datanodes
are added.
One important goal of hadoop is the ability to grow a cluster over time.

Thanks,
dhruba

-----Original Message-----
From: C G [mailto:parallelguy@...]
Sent: Friday, August 03, 2007 3:17 AM
To: hadoop-user@...
Subject: HDFS Question re adding additional storage

Is it possible to additional space to HDFS (in the form of new datanodes)
with minimal/no fuss?  In other words, if I have 8T across 16 machines, and
I want to go to 16T across 32 machines, can I roll in new machines easily,
or do I need to plan considerable downtime to rebuild things and move data
around?
   
  There are obvious implications here for how big an initial system to
build, and the costs associated with buying now and buying later.
   
  Thanks,
  C G
   

       
---------------------------------
Got a little couch potato?
Check out fun summer activities for kids.
       
---------------------------------
Got a little couch potato?
Check out fun summer activities for kids.


RE: HDFS Question re adding additional storage

by Dhruba Borthakur :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Here is link to the cluster-rebalancing discussion:

http://issues.apache.org/jira/browse/HADOOP-1652

thanks,
dhruba

-----Original Message-----
From: Dhruba Borthakur [mailto:dhruba@...]
Sent: Friday, August 03, 2007 12:28 PM
To: 'hadoop-user@...'
Subject: RE: HDFS Question re adding additional storage

It is possible to roll in additional nodes into the cluster anytime you
want. Not much complexity in that.

However, existing 0.14 hadoop release will not rebalance data across these
new nodes. What that means is that the new nodes will be relatively empty
till new data arrives into the cluster. It might take a while for the new
nodes to get filled up.

Work is in progress to facilitate cluster-data rebalance when new Datanodes
are added.
One important goal of hadoop is the ability to grow a cluster over time.

Thanks,
dhruba

-----Original Message-----
From: C G [mailto:parallelguy@...]
Sent: Friday, August 03, 2007 3:17 AM
To: hadoop-user@...
Subject: HDFS Question re adding additional storage

Is it possible to additional space to HDFS (in the form of new datanodes)
with minimal/no fuss?  In other words, if I have 8T across 16 machines, and
I want to go to 16T across 32 machines, can I roll in new machines easily,
or do I need to plan considerable downtime to rebuild things and move data
around?
   
  There are obvious implications here for how big an initial system to
build, and the costs associated with buying now and buying later.
   
  Thanks,
  C G
   

       
---------------------------------
Got a little couch potato?
Check out fun summer activities for kids.
       
---------------------------------
Got a little couch potato?
Check out fun summer activities for kids.


Re: HDFS Question re adding additional storage

by Konstantin Shvachko :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

There is no down time to add new nodes. You don't even need to stop the
cluster.
Just start datanodes on the new nodes and they will join the cluster.
Hadoop does not have automatic rebalancing as of today though (coming
soon), meaning the new nodes will
fill upon new data is added but their utilization will stay lower than
the utilization of the old nodes for some time.
I'd say buy later if you don't need them now.

Regards,
Konstantin

C G wrote:

>Is it possible to additional space to HDFS (in the form of new datanodes) with minimal/no fuss?  In other words, if I have 8T across 16 machines, and I want to go to 16T across 32 machines, can I roll in new machines easily, or do I need to plan considerable downtime to rebuild things and move data around?
>  
>  There are obvious implications here for how big an initial system to build, and the costs associated with buying now and buying later.
>  
>  Thanks,
>  C G
>  
>
>      
>---------------------------------
>Got a little couch potato?
>Check out fun summer activities for kids.
>      
>---------------------------------
>Got a little couch potato?
>Check out fun summer activities for kids.
>  
>


RE: HDFS Question re adding additional storage

by prasana.iyengar :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Dhruba: I am a newbie; in my search for this capability I came across your post

1. does 0.16.0 have this capability ?
2. does this take place lazily - that's what it'd seem to me based on running it in our cluster.
3. is there way to force the rebalancing operation

thanks,
-prasana
Dhruba Borthakur wrote:
.... What that means is that the new nodes will be relatively empty
till new data arrives into the cluster. It might take a while for the new
nodes to get filled up.

Work is in progress to facilitate cluster-data rebalance when new Datanodes
are added.