Broker get stuck with high amount of persistent messages for a given destination

View: New views
4 Messages — Rating Filter:   Alert me  

Broker get stuck with high amount of persistent messages for a given destination

by Manuel Teira Paz :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hello.

We are embedding an activemq 4.2 broker in our application, running on a
Sun JVM 5, and using an Oracle 10g database as persistent messaging
store (without journal, since we are using a JDBC master-slave cluster).
In a pair of occasions, we were unable to get the broker started in our
production environment. Analyzing the situation, it seems that the cause
was the great amount of persistent messages for a given queue (over
75.000), what was blocking the attempt to create consumers on that
destination.

Trying to reproduce the problem with a more up to date activemq version,
we set up a 5.2.0 standalone broker to resemble our scenario, and a
client that tries to create a set of consumers on the loaded queue. We
exported the table with messages from our production environment and
filled the test scenario database with it. I'm attaching both the
activemq configuration and the client used for testing.

What we found was:

The broker started normally. Since no attempt to consume messages from
the "loaded" destination was performed.
Once the client started, we observed that:
 - We tried to start 10 consumers on the queue.  Only one of those
consumers got the session.createConsumer(), others were blocked (note
that all of them are sharing the same connection)
 - The only started consumer was unable to get any message, it was
blocked in the getMessage() attempt.
 - The broker seems to be trying to load all the messages from the
database, showing the following  stack dump (just the involved threads):

"ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4
tid=0x00ac2ef8 nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041)
        at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184)
        at
java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256)
        at
org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217)
        at
org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275)
        - locked <0xe35bbb40> (a java.lang.Object)
        at
org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
        at
org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
        at
org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
        at
org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541)
        at
org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        - locked <0xb8a0d5d0> (a
org.apache.activemq.transport.InactivityMonitor$1)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:595)

"QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8
nid=0x42 runnable [0xaac7e000..0xaac7faf0]
        at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method)
        at
oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827)
        at
oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768)
        at
oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062)
        at
oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132)
        at
oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
        at
oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329)
        - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement)
        - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection)
        at
org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
        at
org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
        at
org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
        at
org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
        at
org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
        at
org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
        at
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227)
        - locked <0xb8a12f30> (a
org.apache.activemq.broker.region.cursors.QueueStorePrefetch)
        at
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100)
        at
org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
        - locked <0xb8a13648> (a
org.apache.activemq.broker.region.cursors.StoreQueueCursor)
        at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179)
        - locked <0xb8a13648> (a
org.apache.activemq.broker.region.cursors.StoreQueueCursor)
        at
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308)
        at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011)
        - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2)
        at
org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
        at
org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
        at java.lang.Thread.run(Thread.java:595)
 

We waited for 9 or 10 hours without apparent changes: Only one consumer
got, the other threads waiting, and no message consumed.
In the production environment, the only way to recover the situation we
found, was to move message from ACTIVEMQ_MSGS to another table, start
the broker with ACTIVEMQ_MSGS empty, and create an application to
deserialize messages from the table and inject them to the queue using
the JMS API.

We consider that over 10 hours of service unavailability is a serious
problem. The problem seems to be related with the broker trying to get
all the messages from the table at once whenever a consumer for a given
destination is created. Is this a known problem? Is there any way to
improve the situation?

Best regards.


--
Manuel.


package es.tid.planb.test;

import java.util.concurrent.CountDownLatch;
import java.io.*;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.jms.Message;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Date;
import java.util.Properties;

public class MultithreadConsumer
{
    volatile static long consumedCount = 0;
    static int recvTimeout = 2000;
   
    public static void main(String args[]) throws Exception
    {
        if (args.length < 5) {
            System.err.println("Usage: MultithreadConsumer "
                               + "uri queue threads messages synchro");
            System.exit(255);
        }

        Properties jndiProperties = new Properties();
        jndiProperties.setProperty("java.naming.factory.initial",
                                   "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        jndiProperties.setProperty("java.naming.provider.url", args[0]);
        jndiProperties.setProperty("queue." + args[1], args[1]);
       
        Context jndiContext = new InitialContext(jndiProperties);
        ConnectionFactory cfactory =
            (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
        final Destination queue = (Destination) jndiContext.lookup(args[1]);
        final Connection conn = cfactory.createConnection();
        final int consumerCount = Integer.parseInt(args[2]);
        final int messages = Integer.parseInt(args[3]);
        final boolean synchronizeConsumers =
            "true".equals(args[4]) ? true : false;
        final CountDownLatch consumersPrepared = new CountDownLatch(consumerCount);
        final CountDownLatch consumersGo = new CountDownLatch(1);

       
        System.err.println("Consumers: " + consumerCount
                           + ", Messages: " + messages
                           + ", Synchronized: " + synchronizeConsumers);
       
        conn.start();
        Thread consumers[] = new Thread[consumerCount];
        for (int i = 0; i < consumerCount; i++) {
            consumers[i] = new Thread(new Runnable() {
                public void run() {
                    Session session = null;
                    Message msg = null;
                    try {
                        session = conn.createSession(true, 0);
                        MessageConsumer consumer =
                            session.createConsumer(queue);
                        System.out.println("Consumer ready");
                        if (synchronizeConsumers) {
                            consumersPrepared.countDown();
                            consumersGo.await();
                        }
                        while (consumedCount < messages) {
                            msg = consumer.receive(recvTimeout);
                            if (msg != null) {
                                System.out.print("m");
                                session.commit();
                                ++consumedCount;
                            } else {
                                System.out.print("!");
                            }
                        }
                        System.out.println("Consumer finishing");
                    } catch (Exception e) {
                        System.err.println("Exception in consumer");
                        e.printStackTrace(System.err);
                    }
                    if (session != null) {
                        try { session.close(); } catch (Exception e) {};
                    }
                }
                }, "Consumer#" + i);
        }
        for (int i = 0; i < consumers.length; ++i) {
            consumers[i].start();
        }

        if (synchronizeConsumers) {
            System.out.println("Waiting for the consumers to get ready");
            consumersPrepared.await();
            System.out.println("All the consumers ready");
            consumersGo.countDown();
        }
       
        Date before = new Date();

        for (int i = 0; i < consumerCount; ++i) {
            while (true) {
                try {
                    consumers[i].join();
                    break;
                } catch (InterruptedException ie) {}
            }
        }
        Date after = new Date();
        long elapsed = after.getTime() - before.getTime();
        System.err.println("Consumers finished. Messages: "
                           + consumedCount + ", " + elapsed + " ms elapsed");
        System.err.println((consumedCount * 1000.0 / elapsed) + " messages/s");
        conn.close();
    }
}
               

<beans>
  <broker brokerName="TEST"
          persistent="true"
          useShutdownHook="false"
          xmlns="http://activemq.org/config/1.0"
          deleteAllMessagesOnStartup="false">
    <managementContext>
         <managementContext connectorPort="3199" />
      </managementContext>
    <transportConnectors>
      <transportConnector name="default" uri="tcp://localhost:61615"/>
    </transportConnectors>
   <persistenceAdapter>
      <journaledJDBC useJournal="false" dataSource="#jdbc-ds">
        <statements>
          <statements messageTableName="ACTIVEMQ_MSGS_BACK"/>
        </statements>
      </journaledJDBC>
    </persistenceAdapter>
  </broker>
<bean id="jdbc-ds"
        class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:oci:spci_llu/spci_llu@(description=(address=(host=cornatel.hi.inet)(protocol=tcp)(port=1521))(connect_data=(SERVICE_NAME=cor01)))"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
</beans>

Re: Broker get stuck with high amount of persistent messages for a given destination

by Gary Tully :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

I wonder if there is also clean up thread (ActiveMQ Cleanup Timer) in the
mix, not sure about 4.2 but on trunk that is currently a fixed periodic task
rather than a fixed delay task and it could end up hogging the jdbc
connection.

2009/10/16 Manuel Teira <mteira@...>

> Hello.
>
> We are embedding an activemq 4.2 broker in our application, running on a
> Sun JVM 5, and using an Oracle 10g database as persistent messaging store
> (without journal, since we are using a JDBC master-slave cluster).
> In a pair of occasions, we were unable to get the broker started in our
> production environment. Analyzing the situation, it seems that the cause was
> the great amount of persistent messages for a given queue (over 75.000),
> what was blocking the attempt to create consumers on that destination.
>
> Trying to reproduce the problem with a more up to date activemq version, we
> set up a 5.2.0 standalone broker to resemble our scenario, and a client that
> tries to create a set of consumers on the loaded queue. We exported the
> table with messages from our production environment and filled the test
> scenario database with it. I'm attaching both the activemq configuration and
> the client used for testing.
>
> What we found was:
>
> The broker started normally. Since no attempt to consume messages from the
> "loaded" destination was performed.
> Once the client started, we observed that:
> - We tried to start 10 consumers on the queue.  Only one of those consumers
> got the session.createConsumer(), others were blocked (note that all of them
> are sharing the same connection)
> - The only started consumer was unable to get any message, it was blocked
> in the getMessage() attempt.
> - The broker seems to be trying to load all the messages from the database,
> showing the following  stack dump (just the involved threads):
>
> "ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4 tid=0x00ac2ef8
> nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0]
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
>       at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681)
>       at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711)
>       at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041)
>       at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184)
>       at
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256)
>       at
> org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217)
>       at
> org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275)
>       - locked <0xe35bbb40> (a java.lang.Object)
>       at
> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372)
>       at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>       at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>       at
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
>       at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>       at
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
>       at
> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541)
>       at
> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345)
>       at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>       at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>       at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>       at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>       at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>       - locked <0xb8a0d5d0> (a
> org.apache.activemq.transport.InactivityMonitor$1)
>       at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>       at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>       at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>       at java.lang.Thread.run(Thread.java:595)
>
> "QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8
> nid=0x42 runnable [0xaac7e000..0xaac7faf0]
>       at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method)
>       at
> oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827)
>       at
> oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768)
>       at
> oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062)
>       at
> oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132)
>       at
> oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
>       at
> oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329)
>       - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement)
>       - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection)
>       at
> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>       at
> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>       at
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
>       at
> org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
>       at
> org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
>       at
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
>       at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227)
>       - locked <0xb8a12f30> (a
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch)
>       at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100)
>       at
> org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
>       - locked <0xb8a13648> (a
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>       at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179)
>       - locked <0xb8a13648> (a
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>       at
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308)
>       at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011)
>       - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2)
>       at
> org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
>       at
> org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>       at java.lang.Thread.run(Thread.java:595)
>
>
> We waited for 9 or 10 hours without apparent changes: Only one consumer
> got, the other threads waiting, and no message consumed.
> In the production environment, the only way to recover the situation we
> found, was to move message from ACTIVEMQ_MSGS to another table, start the
> broker with ACTIVEMQ_MSGS empty, and create an application to deserialize
> messages from the table and inject them to the queue using the JMS API.
>
> We consider that over 10 hours of service unavailability is a serious
> problem. The problem seems to be related with the broker trying to get all
> the messages from the table at once whenever a consumer for a given
> destination is created. Is this a known problem? Is there any way to improve
> the situation?
>
> Best regards.
>
>
> --
> Manuel.
>
>
> package es.tid.planb.test;
>
> import java.util.concurrent.CountDownLatch;
> import java.io.*;
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.MessageProducer;
> import javax.jms.MessageConsumer;
> import javax.jms.Session;
> import javax.jms.Queue;
> import javax.jms.TemporaryQueue;
> import javax.jms.TextMessage;
> import javax.jms.Message;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
> import java.util.Date;
> import java.util.Properties;
>
> public class MultithreadConsumer
> {
>    volatile static long consumedCount = 0;
>    static int recvTimeout = 2000;
>
>    public static void main(String args[]) throws Exception
>    {
>        if (args.length < 5) {
>            System.err.println("Usage: MultithreadConsumer "
>                               + "uri queue threads messages synchro");
>            System.exit(255);
>        }
>
>        Properties jndiProperties = new Properties();
>        jndiProperties.setProperty("java.naming.factory.initial",
>
> "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>        jndiProperties.setProperty("java.naming.provider.url", args[0]);
>        jndiProperties.setProperty("queue." + args[1], args[1]);
>
>        Context jndiContext = new InitialContext(jndiProperties);
>        ConnectionFactory cfactory =
>            (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
>        final Destination queue = (Destination) jndiContext.lookup(args[1]);
>        final Connection conn = cfactory.createConnection();
>        final int consumerCount = Integer.parseInt(args[2]);
>        final int messages = Integer.parseInt(args[3]);
>        final boolean synchronizeConsumers =
>            "true".equals(args[4]) ? true : false;
>        final CountDownLatch consumersPrepared = new
> CountDownLatch(consumerCount);
>        final CountDownLatch consumersGo = new CountDownLatch(1);
>
>
>        System.err.println("Consumers: " + consumerCount
>                           + ", Messages: " + messages
>                           + ", Synchronized: " + synchronizeConsumers);
>
>        conn.start();
>        Thread consumers[] = new Thread[consumerCount];
>        for (int i = 0; i < consumerCount; i++) {
>            consumers[i] = new Thread(new Runnable() {
>                public void run() {
>                    Session session = null;
>                    Message msg = null;
>                    try {
>                        session = conn.createSession(true, 0);
>                        MessageConsumer consumer =
>                            session.createConsumer(queue);
>                        System.out.println("Consumer ready");
>                        if (synchronizeConsumers) {
>                            consumersPrepared.countDown();
>                            consumersGo.await();
>                        }
>                        while (consumedCount < messages) {
>                            msg = consumer.receive(recvTimeout);
>                            if (msg != null) {
>                                System.out.print("m");
>                                session.commit();
>                                ++consumedCount;
>                            } else {
>                                System.out.print("!");
>                            }
>                        }
>                        System.out.println("Consumer finishing");
>                    } catch (Exception e) {
>                        System.err.println("Exception in consumer");
>                        e.printStackTrace(System.err);
>                    }
>                    if (session != null) {
>                        try { session.close(); } catch (Exception e) {};
>                    }
>                }
>                }, "Consumer#" + i);
>        }
>        for (int i = 0; i < consumers.length; ++i) {
>            consumers[i].start();
>        }
>
>        if (synchronizeConsumers) {
>            System.out.println("Waiting for the consumers to get ready");
>            consumersPrepared.await();
>            System.out.println("All the consumers ready");
>            consumersGo.countDown();
>        }
>
>        Date before = new Date();
>
>        for (int i = 0; i < consumerCount; ++i) {
>            while (true) {
>                try {
>                    consumers[i].join();
>                    break;
>                } catch (InterruptedException ie) {}
>            }
>        }
>        Date after = new Date();
>        long elapsed = after.getTime() - before.getTime();
>        System.err.println("Consumers finished. Messages: "
>                           + consumedCount + ", " + elapsed + " ms
> elapsed");
>        System.err.println((consumedCount * 1000.0 / elapsed) + "
> messages/s");
>        conn.close();
>    }
> }
>
>
>


--
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: Broker get stuck with high amount of persistent messages for a given destination

by Manuel Teira Paz :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hello again.
Let me insist on this problem, since it's hitting our production
environment every other day. Anytime that, for whatever reason, one of
the system queues goes over ~60.000 pending persistent messages, we are
not able to restart it properly, suffering the problems I've tried to
detail in the former message.
Could you be so kind to take a look on it? If you need any further
information, please, make me know.

Best regards.


Manuel Teira escribió:

> Hello.
>
> We are embedding an activemq 4.2 broker in our application, running on a
> Sun JVM 5, and using an Oracle 10g database as persistent messaging
> store (without journal, since we are using a JDBC master-slave cluster).
> In a pair of occasions, we were unable to get the broker started in our
> production environment. Analyzing the situation, it seems that the cause
> was the great amount of persistent messages for a given queue (over
> 75.000), what was blocking the attempt to create consumers on that
> destination.
>
> Trying to reproduce the problem with a more up to date activemq version,
> we set up a 5.2.0 standalone broker to resemble our scenario, and a
> client that tries to create a set of consumers on the loaded queue. We
> exported the table with messages from our production environment and
> filled the test scenario database with it. I'm attaching both the
> activemq configuration and the client used for testing.
>
> What we found was:
>
> The broker started normally. Since no attempt to consume messages from
> the "loaded" destination was performed.
> Once the client started, we observed that:
>  - We tried to start 10 consumers on the queue.  Only one of those
> consumers got the session.createConsumer(), others were blocked (note
> that all of them are sharing the same connection)
>  - The only started consumer was unable to get any message, it was
> blocked in the getMessage() attempt.
>  - The broker seems to be trying to load all the messages from the
> database, showing the following  stack dump (just the involved threads):
>
> "ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4
> tid=0x00ac2ef8 nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0]
>         at sun.misc.Unsafe.park(Native Method)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041)
>         at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184)
>         at
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256)
>         at
> org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217)
>         at
> org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275)
>         - locked <0xe35bbb40> (a java.lang.Object)
>         at
> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>         at
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>         at
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
>         at
> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541)
>         at
> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345)
>         at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>         at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>         at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>         at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>         - locked <0xb8a0d5d0> (a
> org.apache.activemq.transport.InactivityMonitor$1)
>         at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>         at java.lang.Thread.run(Thread.java:595)
>
> "QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8
> nid=0x42 runnable [0xaac7e000..0xaac7faf0]
>         at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method)
>         at
> oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827)
>         at
> oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768)
>         at
> oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062)
>         at
> oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132)
>         at
> oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
>         at
> oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329)
>         - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement)
>         - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection)
>         at
> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>         at
> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>         at
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
>         at
> org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
>         at
> org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
>         at
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
>         at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227)
>         - locked <0xb8a12f30> (a
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch)
>         at
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100)
>         at
> org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
>         - locked <0xb8a13648> (a
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>         at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179)
>         - locked <0xb8a13648> (a
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>         at
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308)
>         at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011)
>         - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2)
>         at
> org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
>         at
> org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>         at java.lang.Thread.run(Thread.java:595)
>  
>
> We waited for 9 or 10 hours without apparent changes: Only one consumer
> got, the other threads waiting, and no message consumed.
> In the production environment, the only way to recover the situation we
> found, was to move message from ACTIVEMQ_MSGS to another table, start
> the broker with ACTIVEMQ_MSGS empty, and create an application to
> deserialize messages from the table and inject them to the queue using
> the JMS API.
>
> We consider that over 10 hours of service unavailability is a serious
> problem. The problem seems to be related with the broker trying to get
> all the messages from the table at once whenever a consumer for a given
> destination is created. Is this a known problem? Is there any way to
> improve the situation?
>
> Best regards.
>
>
> --
> Manuel.
>
>  


Re: Broker get stuck with high amount of persistent messages for a given destination

by Gary Tully :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

can you verify that you see the problem with the latest snapshot:
https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.4-SNAPSHOT/


2009/11/3 Manuel Teira <mteira@...>:

> Hello again.
> Let me insist on this problem, since it's hitting our production environment
> every other day. Anytime that, for whatever reason, one of the system queues
> goes over ~60.000 pending persistent messages, we are not able to restart it
> properly, suffering the problems I've tried to detail in the former message.
> Could you be so kind to take a look on it? If you need any further
> information, please, make me know.
>
> Best regards.
>
>
> Manuel Teira escribió:
>>
>> Hello.
>>
>> We are embedding an activemq 4.2 broker in our application, running on a
>> Sun JVM 5, and using an Oracle 10g database as persistent messaging store
>> (without journal, since we are using a JDBC master-slave cluster).
>> In a pair of occasions, we were unable to get the broker started in our
>> production environment. Analyzing the situation, it seems that the cause was
>> the great amount of persistent messages for a given queue (over 75.000),
>> what was blocking the attempt to create consumers on that destination.
>>
>> Trying to reproduce the problem with a more up to date activemq version,
>> we set up a 5.2.0 standalone broker to resemble our scenario, and a client
>> that tries to create a set of consumers on the loaded queue. We exported the
>> table with messages from our production environment and filled the test
>> scenario database with it. I'm attaching both the activemq configuration and
>> the client used for testing.
>>
>> What we found was:
>>
>> The broker started normally. Since no attempt to consume messages from the
>> "loaded" destination was performed.
>> Once the client started, we observed that:
>>  - We tried to start 10 consumers on the queue.  Only one of those
>> consumers got the session.createConsumer(), others were blocked (note that
>> all of them are sharing the same connection)
>>  - The only started consumer was unable to get any message, it was blocked
>> in the getMessage() attempt.
>>  - The broker seems to be trying to load all the messages from the
>> database, showing the following  stack dump (just the involved threads):
>>
>> "ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4 tid=0x00ac2ef8
>> nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0]
>>        at sun.misc.Unsafe.park(Native Method)
>>        at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041)
>>        at
>> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184)
>>        at
>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256)
>>        at
>> org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217)
>>        at
>> org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275)
>>        - locked <0xe35bbb40> (a java.lang.Object)
>>        at
>> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372)
>>        at
>> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>>        at
>> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>>        at
>> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
>>        at
>> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>>        at
>> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
>>        at
>> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541)
>>        at
>> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345)
>>        at
>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>>        at
>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>>        at
>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>>        at
>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>>        at
>> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>>        - locked <0xb8a0d5d0> (a
>> org.apache.activemq.transport.InactivityMonitor$1)
>>        at
>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>>        at
>> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>>        at
>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>>        at java.lang.Thread.run(Thread.java:595)
>>
>> "QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8
>> nid=0x42 runnable [0xaac7e000..0xaac7faf0]
>>        at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method)
>>        at
>> oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827)
>>        at
>> oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768)
>>        at
>> oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062)
>>        at
>> oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132)
>>        at
>> oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
>>        at
>> oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329)
>>        - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement)
>>        - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection)
>>        at
>> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>>        at
>> org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
>>        at
>> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
>>        at
>> org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
>>        at
>> org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
>>        at
>> org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
>>        at
>> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227)
>>        - locked <0xb8a12f30> (a
>> org.apache.activemq.broker.region.cursors.QueueStorePrefetch)
>>        at
>> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100)
>>        at
>> org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
>>        - locked <0xb8a13648> (a
>> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>>        at
>> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179)
>>        - locked <0xb8a13648> (a
>> org.apache.activemq.broker.region.cursors.StoreQueueCursor)
>>        at
>> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308)
>>        at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011)
>>        - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>>        at java.lang.Thread.run(Thread.java:595)
>>
>> We waited for 9 or 10 hours without apparent changes: Only one consumer
>> got, the other threads waiting, and no message consumed.
>> In the production environment, the only way to recover the situation we
>> found, was to move message from ACTIVEMQ_MSGS to another table, start the
>> broker with ACTIVEMQ_MSGS empty, and create an application to deserialize
>> messages from the table and inject them to the queue using the JMS API.
>>
>> We consider that over 10 hours of service unavailability is a serious
>> problem. The problem seems to be related with the broker trying to get all
>> the messages from the table at once whenever a consumer for a given
>> destination is created. Is this a known problem? Is there any way to improve
>> the situation?
>>
>> Best regards.
>>
>>
>> --
>> Manuel.
>>
>>
>
>



--
http://blog.garytully.com

Open Source Integration
http://fusesource.com