BUG: JMS consumer hangs while producing

View: New views
2 Messages — Rating Filter:   Alert me  

BUG: JMS consumer hangs while producing

by dcheckoway :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Help!

I have a transactional consumer invoked by Camel for messages on "testQueueA" -- it ends up producing/sending thousands of messages to the JMS queue "testQueueB".  Depending upon how many messages need to be sent, this scenario hangs.  This happens when talking to ActiveMQ via TCP.

I wrote a simplified test case, and one interesting thing I found was that the "bigger" my messages were, the fewer would be sent before everything hangs.  It implied to me that there was some sort of buffer limit being imposed.

Here's the code:

import java.util.logging.Logger;
import org.apache.camel.*;
import org.springframework.beans.factory.annotation.*;

public class ConsumerThatAlsoProduces {
    Logger logger = Logger.getLogger(getClass().getName());
    @Autowired
    ProducerTemplate producerTemplate;
    @Autowired
    @Qualifier("testQueueB")
    Endpoint testQueueB;
   
    public void onTestMessage(TestMessage msg) {
        logger.info("Received from A: " + msg);
        final int numToProduce = 20000;
        logger.info("Producing " + numToProduce + " messages on queueB");
        for (int k = 1; k <= numToProduce; ++k) {
            logger.info("Sending " + k + " of " + numToProduce);
            producerTemplate.sendBody(testQueueB, new TestMessage(k));
        }
    }
}

When I force a thread dump, this is what I'm seeing for the thread of interest:

"DefaultMessageListenerContainer-1" prio=5 tid=0x00000001019ff800 nid=0x111cae000 runnable [0x0000000111cab000..0x0000000111cadad0]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:96)
        at java.io.DataOutputStream.write(DataOutputStream.java:90)
        - locked <0x000000010703aa28> (a java.io.DataOutputStream)
        at org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightMarshalByteSequence2(BaseDataStreamMarshaller.java:432)
        at org.apache.activemq.openwire.v3.MessageMarshaller.tightMarshal2(MessageMarshaller.java:173)
        at org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightMarshal2(ActiveMQMessageMarshaller.java:90)
        at org.apache.activemq.openwire.v3.ActiveMQObjectMessageMarshaller.tightMarshal2(ActiveMQObjectMessageMarshaller.java:90)
        at org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:240)
        - locked <0x000000010701bb78> (a org.apache.activemq.openwire.OpenWireFormat)
        at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:166)
        at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
        - locked <0x000000010705a298> (a java.util.concurrent.atomic.AtomicBoolean)
        at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
        at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        - locked <0x0000000106fdd0e8> (a java.lang.Object)
        at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
        at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
        at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1676)
        - locked <0x0000000106fe9f00> (a java.lang.Object)
        at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
        at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
        - locked <0x0000000107095d88> (a org.apache.activemq.ActiveMQMessageProducer)
        at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
        at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:237)
        at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
        at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:551)
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
        at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:548)
        at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:301)
        at org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:165)
        at org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:151)
        at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:136)
        at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:150)
        at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:86)
        at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:98)
        at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:111)
        at ConsumerThatAlsoProduces.onTestMessage(ConsumerThatAlsoProduces.java:19)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:173)
        at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:95)
        at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:111)
        at org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
        at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
        at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
        at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:110)
        at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
        at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:80)
        at org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
        at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
        at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
        at org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:54)
        at org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:76)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:881)
        at java.lang.Thread.run(Thread.java:637)


I'm pretty sure the problem is that since my consumer is running inside a JMS transaction, the messages it tries to send are being "buffered" by the ActiveMQ server until the transaction commits.  Thus the catch...the transaction won't commit until my consumer returns, which won't happen until all of the messages are sent.

On this link http://activemq.apache.org/how-do-transactions-work.html I noticed this comment:

"Now the operations carried out on a transacted session inside a transaction, like a send message or acknowledge message, do not really perform a real send or acknowledge until the commit occurs. So the Broker explicitly handles these cases separately - essentially buffering up the commands until the commit occurs when the messages are really sent or acknowledged."

What I find interesting is that if I run an embedded ActiveMQ broker and talk via vm://localhost instead of tcp://localhost:61616, the problem goes away.  This ONLY happens when talking TCP.

FWIW, I've been using Camel 1.6.1 with ActiveMQ 5.2.0, but I have also tested this against ActiveMQ 5.3-SNAPSHOT (June 1, 2009), which uses Camel 2.0.  Both straight out of the box.  Same results with both old & new versions.

Is this a bug in ActiveMQ?  Does it really impose some weenie-small limit on what you can post back to the server while still in a transaction?  Is it a hard limit?  Configurable?  Have I screwed up my configuration?  Should I be using some different method to have my consumer in turn produce more messages transactionally?

I'm attaching a zip of my test (minus the "lib" dir, which contains all the typical stuff you'd expect in an ActiveMQ/Camel setup).  Maybe somebody can take a peek and let me know what I'm doing wrong.

camel-activemq-bug.tar.gz
camel-activemq-bug.zip

Help!  Thanks!!

Re: BUG: JMS consumer hangs while producing

by willem.jiang :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,

It is more like a ActiveMQ's issue.
Did you send this issue into the users@... ?

Willem

dcheckoway wrote:

> Help!
>
> I have a transactional consumer invoked by Camel for messages on
> "testQueueA" -- it ends up producing/sending thousands of messages to the
> JMS queue "testQueueB".  Depending upon how many messages need to be sent,
> this scenario hangs.  This happens when talking to ActiveMQ via TCP.
>
> I wrote a simplified test case, and one interesting thing I found was that
> the "bigger" my messages were, the fewer would be sent before everything
> hangs.  It implied to me that there was some sort of buffer limit being
> imposed.
>
> Here's the code:
>
> import java.util.logging.Logger;
> import org.apache.camel.*;
> import org.springframework.beans.factory.annotation.*;
>
> public class ConsumerThatAlsoProduces {
>     Logger logger = Logger.getLogger(getClass().getName());
>     @Autowired
>     ProducerTemplate producerTemplate;
>     @Autowired
>     @Qualifier("testQueueB")
>     Endpoint testQueueB;
>    
>     public void onTestMessage(TestMessage msg) {
>         logger.info("Received from A: " + msg);
>         final int numToProduce = 20000;
>         logger.info("Producing " + numToProduce + " messages on queueB");
>         for (int k = 1; k <= numToProduce; ++k) {
>             logger.info("Sending " + k + " of " + numToProduce);
>             producerTemplate.sendBody(testQueueB, new TestMessage(k));
>         }
>     }
> }
>
> When I force a thread dump, this is what I'm seeing for the thread of
> interest:
>
> "DefaultMessageListenerContainer-1" prio=5 tid=0x00000001019ff800
> nid=0x111cae000 runnable [0x0000000111cab000..0x0000000111cadad0]
>    java.lang.Thread.State: RUNNABLE
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
> at
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:96)
> at java.io.DataOutputStream.write(DataOutputStream.java:90)
> - locked <0x000000010703aa28> (a java.io.DataOutputStream)
> at
> org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightMarshalByteSequence2(BaseDataStreamMarshaller.java:432)
> at
> org.apache.activemq.openwire.v3.MessageMarshaller.tightMarshal2(MessageMarshaller.java:173)
> at
> org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightMarshal2(ActiveMQMessageMarshaller.java:90)
> at
> org.apache.activemq.openwire.v3.ActiveMQObjectMessageMarshaller.tightMarshal2(ActiveMQObjectMessageMarshaller.java:90)
> at
> org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:240)
> - locked <0x000000010701bb78> (a
> org.apache.activemq.openwire.OpenWireFormat)
> at
> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:166)
> at
> org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
> - locked <0x000000010705a298> (a java.util.concurrent.atomic.AtomicBoolean)
> at
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
> at
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
> at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
> - locked <0x0000000106fdd0e8> (a java.lang.Object)
> at
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> at
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
> at
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
> at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1676)
> - locked <0x0000000106fe9f00> (a java.lang.Object)
> at
> org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
> at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
> - locked <0x0000000107095d88> (a
> org.apache.activemq.ActiveMQMessageProducer)
> at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
> at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
> at
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:237)
> at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
> at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:551)
> at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
> at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:548)
> at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:301)
> at
> org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:165)
> at
> org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:151)
> at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:136)
> at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:150)
> at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:86)
> at
> org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:98)
> at
> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:111)
> at ConsumerThatAlsoProduces.onTestMessage(ConsumerThatAlsoProduces.java:19)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:173)
> at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:95)
> at
> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:111)
> at
> org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
> at
> org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
> at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
> at
> org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:110)
> at
> org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
> at
> org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
> at
> org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:80)
> at
> org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
> at
> org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
> at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
> at
> org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:54)
> at
> org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)
> at
> org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:76)
> at
> org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
> at
> org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
> at
> org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
> at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
> at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:881)
> at java.lang.Thread.run(Thread.java:637)
>
>
> I'm pretty sure the problem is that since my consumer is running inside a
> JMS transaction, the messages it tries to send are being "buffered" by the
> ActiveMQ server until the transaction commits.  Thus the catch...the
> transaction won't commit until my consumer returns, which won't happen until
> all of the messages are sent.
>
> On this link  http://activemq.apache.org/how-do-transactions-work.html
> http://activemq.apache.org/how-do-transactions-work.html  I noticed this
> comment:
>
> "Now the operations carried out on a transacted session inside a
> transaction, like a send message or acknowledge message, do not really
> perform a real send or acknowledge until the commit occurs. So the Broker
> explicitly handles these cases separately - essentially buffering up the
> commands until the commit occurs when the messages are really sent or
> acknowledged."
>
> What I find interesting is that if I run an embedded ActiveMQ broker and
> talk via vm://localhost instead of tcp://localhost:61616, the problem goes
> away.  This ONLY happens when talking TCP.
>
> FWIW, I've been using Camel 1.6.1 with ActiveMQ 5.2.0, but I have also
> tested this against ActiveMQ 5.3-SNAPSHOT (June 1, 2009), which uses Camel
> 2.0.  Both straight out of the box.  Same results with both old & new
> versions.
>
> Is this a bug in ActiveMQ?  Does it really impose some weenie-small limit on
> what you can post back to the server while still in a transaction?  Is it a
> hard limit?  Configurable?  Have I screwed up my configuration?  Should I be
> using some different method to have my consumer in turn produce more
> messages transactionally?
>
> I'm attaching a zip of my test (minus the "lib" dir, which contains all the
> typical stuff you'd expect in an ActiveMQ/Camel setup).  Maybe somebody can
> take a peek and let me know what I'm doing wrong.
>
> http://www.nabble.com/file/p23824159/camel-activemq-bug.tar.gz
> camel-activemq-bug.tar.gz
> http://www.nabble.com/file/p23824159/camel-activemq-bug.zip
> camel-activemq-bug.zip
>
> Help!  Thanks!!