« Return to Thread: ClassCastException having relation to expired messages

Re: ClassCastException having relation to expired messages

by sic :: Rate this Message:

Reply to Author | View in Thread

The problem still occured in the latest 5.3-SNAPSHOT.

I make reference to sample of this URL
http://www.nabble.com/ActiveMQ-message-delivery-td18355245.html

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import junit.framework.TestCase;

/**
 * @author Marjan Sterjev
 *
 */
public class ProducerTester extends TestCase {
       
        public void testSendQueue() throws JMSException, InterruptedException {
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue("test.queue");
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                producer.setTimeToLive(10000);
                int msgCount = 20;
                int sleep = 100;
                String payload = "Test Message";
                for (int i = 0; i < msgCount; i++) {
                        ObjectMessage message = session.createObjectMessage();
                        String p = String.format("%s:%d", payload, (i + 1));
                        message.setObject(p);
                        producer.send(message);
                        Thread.sleep(sleep);
                }
                producer.close();
                session.close();
                connection.close();
        }
}

I have already known a workaround which works a periodic message expiry task as you refer.

However unusual case can occur as I mentioned.

Test steps are followed :
start an ActiveMQ setting a vm queue cursor.
set a MessageConsumer to consume only those messages that had expired in every 5 minutes.
send a message(set TimeToLive to 10 seconds) by junit test case (ProducerTester class)
After between 10 seconds and 5 minutes(hope that message is expired while MessageConsumer doesn't work yet), make server shutdown abruptly so that messages which has been expired leaves on queue.
restart an ActiveMQ.
In this states, an ActiveMQ cannot be started by ClassCastException.


Gary Tully wrote:
would it be possible to provide a junit test case that demonstrates this
behavior? I wonder does the problem persist with the latest 5.3-SNAPSHOT?

Trunk has a resolution to
https://issues.apache.org/activemq/browse/AMQ-1112that provides a
periodic message expiry task. That should help but it may be
that there still remains some assumptions about the type of message.
A junit test case would help lots.



2009/7/7 sic <sic_1234@naver.com>

>
> While expired messages remain on queue, server cannot be re-started with
> ClassCastException until I delete persistent file(e.g. file :
> data\kr-store\data\hash-index-queue-data_queue#3a#2f#2fTEST.QUEUE,
> QueueName
> : TEST.QUEUE) of the queue which has expired messages
>
> Yet, this way has disadvantage that all of messages on specified queue have
> been purged regardless of expiration.
>
> Does anyone have other ways?
>
>
> sic wrote:
> >
> > While testing ActiveMQ5.2.0, I have a question about expired message.
> >
> > Note that testing messages are Persistent and using a queue, set message
> > expiration time to 10 seconds. configurations are same except destination
> > policy(using a vm queue cursor) as below
> > <destinationPolicy>
> >       <policyMap>
> >               <policyEntries>
> >                       <policyEntry queue=">" memoryLimit="5mb">
> >                               <dispatchPolicy>
> >                                       <strictOrderDispatchPolicy />
> >                               </dispatchPolicy>
> >                               <deadLetterStrategy>
> >                                       <individualDeadLetterStrategy
> queuePrefix="DLQ."/>
> >                               </deadLetterStrategy>
> >                               <pendingQueuePolicy>
> >                                       <vmQueueCursor />
> >                               </pendingQueuePolicy>
> >                       </policyEntry>
> >               </policyEntries>
> >       </policyMap>
> > </destinationPolicy>
> > Additionally, it turns out that the activemq broker actually does not
> > proactively purge expired messages from queues. so we set
> > a thread that periodically cleared my queues of expired messages by help
> > of this forum's advisor.
> > We suppose that a server is stopped unexpectedly when expired message
> > leaves on queue without disposed by the thread
> > After recovering a server, we will expect that remained messages
> including
> > both normal and expired message are loaded normally
> > However server stopped abnormally and some error occured while starting
> > ActiveMQ. Errors are like that
> >
> > 2009-07-01 17:13:45,125 [main           ] INFO  BrokerService
> > - For help or more information please see: http://activemq.apache.org/
> > 2009-07-01 17:13:45,421 [main           ] INFO  KahaStore
> > - Kaha Store using data directory
> > D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
> > 2009-07-01 17:13:45,796 [main           ] ERROR BrokerService
> > - Failed to start ActiveMQ JMS Message Broker. Reason:
> > java.lang.ClassCastException:
> > org.apache.activemq.command.ActiveMQObjectMessage
> > java.lang.ClassCastException:
> > org.apache.activemq.command.ActiveMQObjectMessage
> >       at
> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
> >       at
> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
> >       at
> > org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
> >       at
> >
> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
> >       at
> >
> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
> >       at
> >
> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
> >       at
> >
> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
> >       at
> >
> org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
> >       at
> org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
> >       at
> >
> org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
> >       at
> >
> org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
> >       at
> >
> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
> >       at
> >
> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
> >       at
> >
> org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
> >       at
> >
> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
> >       at
> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
> >       at
> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
> >       at
> >
> org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
> >       at
> org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
> >       at
> >
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
> >       at
> >
> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
> >       at
> >
> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
> >       at
> >
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
> >       at
> >
> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
> >       at
> >
> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
> >       at
> >
> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
> >       at
> >
> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
> >       at
> >
> org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
> >       at
> >
> org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
> >       at
> >
> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
> >       at
> >
> org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
> >       at
> >
> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
> >       at
> >
> org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >       at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> >       at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> >       at java.lang.reflect.Method.invoke(Method.java:585)
> >       at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
> >       at org.apache.activemq.console.Main.main(Main.java:106)
> > 2009-07-01 17:13:45,812 [main           ] INFO  BrokerService
> > - ActiveMQ Message Broker (localhost, null) is shutting down
> > 2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector
> > - Network Connector default-nc Stopped
> >
> > It's in Windows XP.
> >
> > Regards
> >
>
> --
> View this message in context:
> http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24369696.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


--
http://blog.garytully.com

Open Source Integration
http://fusesource.com

 « Return to Thread: ClassCastException having relation to expired messages