ClassCastException having relation to expired messages

View: New views
7 Messages — Rating Filter:   Alert me  

ClassCastException having relation to expired messages

by sic :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

While testing ActiveMQ5.2.0, I have a question about expired message.

Note that testing messages are Persistent and using a queue, set message expiration time to 10 seconds. configurations are same except destination policy(using a vm queue cursor) as below
<destinationPolicy>
        <policyMap>
                <policyEntries>
                        <policyEntry queue=">" memoryLimit="5mb">
                                <dispatchPolicy>
                                        <strictOrderDispatchPolicy />
                                </dispatchPolicy>
                                <deadLetterStrategy>
                                        <individualDeadLetterStrategy queuePrefix="DLQ."/>
                                </deadLetterStrategy>
                                <pendingQueuePolicy>
                                        <vmQueueCursor />
                                </pendingQueuePolicy>
                        </policyEntry>
                </policyEntries>
        </policyMap>
</destinationPolicy>
Additionally, it turns out that the activemq broker actually does not proactively purge expired messages from queues. so we set
a thread that periodically cleared my queues of expired messages by help of this forum's advisor.
We suppose that a server is stopped unexpectedly when expired message leaves on queue without disposed by the thread
After recovering a server, we will expect that remained messages including both normal and expired message are loaded normally
However server stopped abnormally and some error occured while starting ActiveMQ. Errors are like that

2009-07-01 17:13:45,125 [main           ] INFO  BrokerService                  - For help or more information please see: http://activemq.apache.org/
2009-07-01 17:13:45,421 [main           ] INFO  KahaStore                      - Kaha Store using data directory D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
2009-07-01 17:13:45,796 [main           ] ERROR BrokerService                  - Failed to start ActiveMQ JMS Message Broker. Reason: java.lang.ClassCastException: org.apache.activemq.command.ActiveMQObjectMessage
java.lang.ClassCastException: org.apache.activemq.command.ActiveMQObjectMessage
        at org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
        at org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
        at org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
        at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
        at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
        at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
        at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
        at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
        at org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
        at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
        at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
        at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
        at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
        at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
        at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
        at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
        at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
        at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
        at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
        at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
        at org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
        at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
        at java.security.AccessController.doPrivileged(Native Method)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
        at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
        at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
        at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
        at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
        at org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
        at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
        at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
        at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
        at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
        at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:585)
        at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
        at org.apache.activemq.console.Main.main(Main.java:106)
2009-07-01 17:13:45,812 [main           ] INFO  BrokerService                  - ActiveMQ Message Broker (localhost, null) is shutting down
2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector               - Network Connector default-nc Stopped

It's in Windows XP.

Regards

Re: ClassCastException having relation to expired messages

by sic :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

While expired messages remain on queue, server cannot be re-started with ClassCastException until I delete persistent file(e.g. file : data\kr-store\data\hash-index-queue-data_queue#3a#2f#2fTEST.QUEUE, QueueName : TEST.QUEUE) of the queue which has expired messages

Yet, this way has disadvantage that all of messages on specified queue have been purged regardless of expiration.

Does anyone have other ways?

sic wrote:
While testing ActiveMQ5.2.0, I have a question about expired message.

Note that testing messages are Persistent and using a queue, set message expiration time to 10 seconds. configurations are same except destination policy(using a vm queue cursor) as below
<destinationPolicy>
        <policyMap>
                <policyEntries>
                        <policyEntry queue=">" memoryLimit="5mb">
                                <dispatchPolicy>
                                        <strictOrderDispatchPolicy />
                                </dispatchPolicy>
                                <deadLetterStrategy>
                                        <individualDeadLetterStrategy queuePrefix="DLQ."/>
                                </deadLetterStrategy>
                                <pendingQueuePolicy>
                                        <vmQueueCursor />
                                </pendingQueuePolicy>
                        </policyEntry>
                </policyEntries>
        </policyMap>
</destinationPolicy>
Additionally, it turns out that the activemq broker actually does not proactively purge expired messages from queues. so we set
a thread that periodically cleared my queues of expired messages by help of this forum's advisor.
We suppose that a server is stopped unexpectedly when expired message leaves on queue without disposed by the thread
After recovering a server, we will expect that remained messages including both normal and expired message are loaded normally
However server stopped abnormally and some error occured while starting ActiveMQ. Errors are like that

2009-07-01 17:13:45,125 [main           ] INFO  BrokerService                  - For help or more information please see: http://activemq.apache.org/
2009-07-01 17:13:45,421 [main           ] INFO  KahaStore                      - Kaha Store using data directory D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
2009-07-01 17:13:45,796 [main           ] ERROR BrokerService                  - Failed to start ActiveMQ JMS Message Broker. Reason: java.lang.ClassCastException: org.apache.activemq.command.ActiveMQObjectMessage
java.lang.ClassCastException: org.apache.activemq.command.ActiveMQObjectMessage
        at org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
        at org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
        at org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
        at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
        at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
        at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
        at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
        at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
        at org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
        at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
        at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
        at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
        at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
        at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
        at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
        at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
        at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
        at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
        at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
        at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
        at org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
        at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
        at java.security.AccessController.doPrivileged(Native Method)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
        at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
        at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
        at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
        at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
        at org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
        at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
        at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
        at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
        at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
        at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:585)
        at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
        at org.apache.activemq.console.Main.main(Main.java:106)
2009-07-01 17:13:45,812 [main           ] INFO  BrokerService                  - ActiveMQ Message Broker (localhost, null) is shutting down
2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector               - Network Connector default-nc Stopped

It's in Windows XP.

Regards

Re: ClassCastException having relation to expired messages

by Gary Tully :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

would it be possible to provide a junit test case that demonstrates this
behavior? I wonder does the problem persist with the latest 5.3-SNAPSHOT?

Trunk has a resolution to
https://issues.apache.org/activemq/browse/AMQ-1112that provides a
periodic message expiry task. That should help but it may be
that there still remains some assumptions about the type of message.
A junit test case would help lots.



2009/7/7 sic <sic_1234@...>

>
> While expired messages remain on queue, server cannot be re-started with
> ClassCastException until I delete persistent file(e.g. file :
> data\kr-store\data\hash-index-queue-data_queue#3a#2f#2fTEST.QUEUE,
> QueueName
> : TEST.QUEUE) of the queue which has expired messages
>
> Yet, this way has disadvantage that all of messages on specified queue have
> been purged regardless of expiration.
>
> Does anyone have other ways?
>
>
> sic wrote:
> >
> > While testing ActiveMQ5.2.0, I have a question about expired message.
> >
> > Note that testing messages are Persistent and using a queue, set message
> > expiration time to 10 seconds. configurations are same except destination
> > policy(using a vm queue cursor) as below
> > <destinationPolicy>
> >       <policyMap>
> >               <policyEntries>
> >                       <policyEntry queue=">" memoryLimit="5mb">
> >                               <dispatchPolicy>
> >                                       <strictOrderDispatchPolicy />
> >                               </dispatchPolicy>
> >                               <deadLetterStrategy>
> >                                       <individualDeadLetterStrategy
> queuePrefix="DLQ."/>
> >                               </deadLetterStrategy>
> >                               <pendingQueuePolicy>
> >                                       <vmQueueCursor />
> >                               </pendingQueuePolicy>
> >                       </policyEntry>
> >               </policyEntries>
> >       </policyMap>
> > </destinationPolicy>
> > Additionally, it turns out that the activemq broker actually does not
> > proactively purge expired messages from queues. so we set
> > a thread that periodically cleared my queues of expired messages by help
> > of this forum's advisor.
> > We suppose that a server is stopped unexpectedly when expired message
> > leaves on queue without disposed by the thread
> > After recovering a server, we will expect that remained messages
> including
> > both normal and expired message are loaded normally
> > However server stopped abnormally and some error occured while starting
> > ActiveMQ. Errors are like that
> >
> > 2009-07-01 17:13:45,125 [main           ] INFO  BrokerService
> > - For help or more information please see: http://activemq.apache.org/
> > 2009-07-01 17:13:45,421 [main           ] INFO  KahaStore
> > - Kaha Store using data directory
> > D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
> > 2009-07-01 17:13:45,796 [main           ] ERROR BrokerService
> > - Failed to start ActiveMQ JMS Message Broker. Reason:
> > java.lang.ClassCastException:
> > org.apache.activemq.command.ActiveMQObjectMessage
> > java.lang.ClassCastException:
> > org.apache.activemq.command.ActiveMQObjectMessage
> >       at
> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
> >       at
> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
> >       at
> > org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
> >       at
> >
> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
> >       at
> >
> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
> >       at
> >
> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
> >       at
> >
> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
> >       at
> >
> org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
> >       at
> org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
> >       at
> >
> org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
> >       at
> >
> org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
> >       at
> >
> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
> >       at
> >
> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
> >       at
> >
> org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
> >       at
> >
> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
> >       at
> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
> >       at
> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
> >       at
> >
> org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
> >       at
> org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
> >       at
> >
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
> >       at
> >
> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
> >       at
> >
> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
> >       at
> >
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
> >       at
> >
> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
> >       at
> >
> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
> >       at
> >
> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
> >       at
> >
> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
> >       at
> >
> org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
> >       at
> >
> org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
> >       at
> >
> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
> >       at
> >
> org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
> >       at
> >
> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
> >       at
> >
> org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >       at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> >       at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> >       at java.lang.reflect.Method.invoke(Method.java:585)
> >       at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
> >       at org.apache.activemq.console.Main.main(Main.java:106)
> > 2009-07-01 17:13:45,812 [main           ] INFO  BrokerService
> > - ActiveMQ Message Broker (localhost, null) is shutting down
> > 2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector
> > - Network Connector default-nc Stopped
> >
> > It's in Windows XP.
> >
> > Regards
> >
>
> --
> View this message in context:
> http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24369696.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


--
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: ClassCastException having relation to expired messages

by sic :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

The problem still occured in the latest 5.3-SNAPSHOT.

I make reference to sample of this URL
http://www.nabble.com/ActiveMQ-message-delivery-td18355245.html

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import junit.framework.TestCase;

/**
 * @author Marjan Sterjev
 *
 */
public class ProducerTester extends TestCase {
       
        public void testSendQueue() throws JMSException, InterruptedException {
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue("test.queue");
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                producer.setTimeToLive(10000);
                int msgCount = 20;
                int sleep = 100;
                String payload = "Test Message";
                for (int i = 0; i < msgCount; i++) {
                        ObjectMessage message = session.createObjectMessage();
                        String p = String.format("%s:%d", payload, (i + 1));
                        message.setObject(p);
                        producer.send(message);
                        Thread.sleep(sleep);
                }
                producer.close();
                session.close();
                connection.close();
        }
}

I have already known a workaround which works a periodic message expiry task as you refer.

However unusual case can occur as I mentioned.

Test steps are followed :
start an ActiveMQ setting a vm queue cursor.
set a MessageConsumer to consume only those messages that had expired in every 5 minutes.
send a message(set TimeToLive to 10 seconds) by junit test case (ProducerTester class)
After between 10 seconds and 5 minutes(hope that message is expired while MessageConsumer doesn't work yet), make server shutdown abruptly so that messages which has been expired leaves on queue.
restart an ActiveMQ.
In this states, an ActiveMQ cannot be started by ClassCastException.


Gary Tully wrote:
would it be possible to provide a junit test case that demonstrates this
behavior? I wonder does the problem persist with the latest 5.3-SNAPSHOT?

Trunk has a resolution to
https://issues.apache.org/activemq/browse/AMQ-1112that provides a
periodic message expiry task. That should help but it may be
that there still remains some assumptions about the type of message.
A junit test case would help lots.



2009/7/7 sic <sic_1234@naver.com>

>
> While expired messages remain on queue, server cannot be re-started with
> ClassCastException until I delete persistent file(e.g. file :
> data\kr-store\data\hash-index-queue-data_queue#3a#2f#2fTEST.QUEUE,
> QueueName
> : TEST.QUEUE) of the queue which has expired messages
>
> Yet, this way has disadvantage that all of messages on specified queue have
> been purged regardless of expiration.
>
> Does anyone have other ways?
>
>
> sic wrote:
> >
> > While testing ActiveMQ5.2.0, I have a question about expired message.
> >
> > Note that testing messages are Persistent and using a queue, set message
> > expiration time to 10 seconds. configurations are same except destination
> > policy(using a vm queue cursor) as below
> > <destinationPolicy>
> >       <policyMap>
> >               <policyEntries>
> >                       <policyEntry queue=">" memoryLimit="5mb">
> >                               <dispatchPolicy>
> >                                       <strictOrderDispatchPolicy />
> >                               </dispatchPolicy>
> >                               <deadLetterStrategy>
> >                                       <individualDeadLetterStrategy
> queuePrefix="DLQ."/>
> >                               </deadLetterStrategy>
> >                               <pendingQueuePolicy>
> >                                       <vmQueueCursor />
> >                               </pendingQueuePolicy>
> >                       </policyEntry>
> >               </policyEntries>
> >       </policyMap>
> > </destinationPolicy>
> > Additionally, it turns out that the activemq broker actually does not
> > proactively purge expired messages from queues. so we set
> > a thread that periodically cleared my queues of expired messages by help
> > of this forum's advisor.
> > We suppose that a server is stopped unexpectedly when expired message
> > leaves on queue without disposed by the thread
> > After recovering a server, we will expect that remained messages
> including
> > both normal and expired message are loaded normally
> > However server stopped abnormally and some error occured while starting
> > ActiveMQ. Errors are like that
> >
> > 2009-07-01 17:13:45,125 [main           ] INFO  BrokerService
> > - For help or more information please see: http://activemq.apache.org/
> > 2009-07-01 17:13:45,421 [main           ] INFO  KahaStore
> > - Kaha Store using data directory
> > D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
> > 2009-07-01 17:13:45,796 [main           ] ERROR BrokerService
> > - Failed to start ActiveMQ JMS Message Broker. Reason:
> > java.lang.ClassCastException:
> > org.apache.activemq.command.ActiveMQObjectMessage
> > java.lang.ClassCastException:
> > org.apache.activemq.command.ActiveMQObjectMessage
> >       at
> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
> >       at
> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
> >       at
> > org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
> >       at
> >
> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
> >       at
> >
> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
> >       at
> >
> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
> >       at
> >
> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
> >       at
> >
> org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
> >       at
> org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
> >       at
> >
> org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
> >       at
> >
> org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
> >       at
> >
> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
> >       at
> >
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
> >       at
> >
> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
> >       at
> >
> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
> >       at
> >
> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
> >       at
> >
> org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
> >       at
> >
> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
> >       at
> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
> >       at
> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
> >       at
> >
> org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
> >       at
> org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at
> >
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
> >       at
> >
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
> >       at
> >
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
> >       at
> >
> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
> >       at
> >
> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
> >       at
> >
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
> >       at
> >
> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
> >       at
> >
> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
> >       at
> >
> org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
> >       at
> >
> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
> >       at
> >
> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
> >       at
> >
> org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
> >       at
> >
> org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
> >       at
> >
> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
> >       at
> >
> org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
> >       at
> >
> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
> >       at
> >
> org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >       at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> >       at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> >       at java.lang.reflect.Method.invoke(Method.java:585)
> >       at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
> >       at org.apache.activemq.console.Main.main(Main.java:106)
> > 2009-07-01 17:13:45,812 [main           ] INFO  BrokerService
> > - ActiveMQ Message Broker (localhost, null) is shutting down
> > 2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector
> > - Network Connector default-nc Stopped
> >
> > It's in Windows XP.
> >
> > Regards
> >
>
> --
> View this message in context:
> http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24369696.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


--
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: ClassCastException having relation to expired messages

by Gary Tully :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Can you create a jira issue [1] for this (include as much of this
detail as possible) and I will try and get it into 5.3

[1] https://issues.apache.org/activemq/secure/CreateIssue!default.jspa

2009/7/9 sic <sic_1234@...>:

>
> The problem still occured in the latest 5.3-SNAPSHOT.
>
> I make reference to sample of this URL
> http://www.nabble.com/ActiveMQ-message-delivery-td18355245.html
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.MessageProducer;
> import javax.jms.ObjectMessage;
> import javax.jms.Session;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
> import junit.framework.TestCase;
>
> /**
>  * @author Marjan Sterjev
>  *
>  */
> public class ProducerTester extends TestCase {
>
>        public void testSendQueue() throws JMSException, InterruptedException {
>                ConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
>                Connection connection = connectionFactory.createConnection();
>                Session session = connection.createSession(false,
>                        Session.AUTO_ACKNOWLEDGE);
>                Destination destination = session.createQueue("test.queue");
>                MessageProducer producer = session.createProducer(destination);
>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>                producer.setTimeToLive(10000);
>                int msgCount = 20;
>                int sleep = 100;
>                String payload = "Test Message";
>                for (int i = 0; i < msgCount; i++) {
>                        ObjectMessage message = session.createObjectMessage();
>                        String p = String.format("%s:%d", payload, (i + 1));
>                        message.setObject(p);
>                        producer.send(message);
>                        Thread.sleep(sleep);
>                }
>                producer.close();
>                session.close();
>                connection.close();
>        }
> }
>
> I have already known a workaround which works a periodic message expiry task
> as you refer.
>
> However unusual case can occur as I mentioned.
>
> Test steps are followed :
> start an ActiveMQ setting a vm queue cursor.
> set a MessageConsumer to consume only those messages that had expired in
> every 5 minutes.
> send a message(set TimeToLive to 10 seconds) by junit test case
> (ProducerTester class)
> After between 10 seconds and 5 minutes(hope that message is expired while
> MessageConsumer doesn't work yet), make server shutdown abruptly so that
> messages which has been expired leaves on queue.
> restart an ActiveMQ.
> In this states, an ActiveMQ cannot be stated by ClassCastException.
>
>
>
> Gary Tully wrote:
>>
>> would it be possible to provide a junit test case that demonstrates this
>> behavior? I wonder does the problem persist with the latest 5.3-SNAPSHOT?
>>
>> Trunk has a resolution to
>> https://issues.apache.org/activemq/browse/AMQ-1112that provides a
>> periodic message expiry task. That should help but it may be
>> that there still remains some assumptions about the type of message.
>> A junit test case would help lots.
>>
>>
>>
>> 2009/7/7 sic <sic_1234@...>
>>
>>>
>>> While expired messages remain on queue, server cannot be re-started with
>>> ClassCastException until I delete persistent file(e.g. file :
>>> data\kr-store\data\hash-index-queue-data_queue#3a#2f#2fTEST.QUEUE,
>>> QueueName
>>> : TEST.QUEUE) of the queue which has expired messages
>>>
>>> Yet, this way has disadvantage that all of messages on specified queue
>>> have
>>> been purged regardless of expiration.
>>>
>>> Does anyone have other ways?
>>>
>>>
>>> sic wrote:
>>> >
>>> > While testing ActiveMQ5.2.0, I have a question about expired message.
>>> >
>>> > Note that testing messages are Persistent and using a queue, set
>>> message
>>> > expiration time to 10 seconds. configurations are same except
>>> destination
>>> > policy(using a vm queue cursor) as below
>>> > <destinationPolicy>
>>> >       <policyMap>
>>> >               <policyEntries>
>>> >                       <policyEntry queue=">" memoryLimit="5mb">
>>> >                               <dispatchPolicy>
>>> >                                       <strictOrderDispatchPolicy />
>>> >                               </dispatchPolicy>
>>> >                               <deadLetterStrategy>
>>> >                                       <individualDeadLetterStrategy
>>> queuePrefix="DLQ."/>
>>> >                               </deadLetterStrategy>
>>> >                               <pendingQueuePolicy>
>>> >                                       <vmQueueCursor />
>>> >                               </pendingQueuePolicy>
>>> >                       </policyEntry>
>>> >               </policyEntries>
>>> >       </policyMap>
>>> > </destinationPolicy>
>>> > Additionally, it turns out that the activemq broker actually does not
>>> > proactively purge expired messages from queues. so we set
>>> > a thread that periodically cleared my queues of expired messages by
>>> help
>>> > of this forum's advisor.
>>> > We suppose that a server is stopped unexpectedly when expired message
>>> > leaves on queue without disposed by the thread
>>> > After recovering a server, we will expect that remained messages
>>> including
>>> > both normal and expired message are loaded normally
>>> > However server stopped abnormally and some error occured while starting
>>> > ActiveMQ. Errors are like that
>>> >
>>> > 2009-07-01 17:13:45,125 [main           ] INFO  BrokerService
>>> > - For help or more information please see: http://activemq.apache.org/
>>> > 2009-07-01 17:13:45,421 [main           ] INFO  KahaStore
>>> > - Kaha Store using data directory
>>> > D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
>>> > 2009-07-01 17:13:45,796 [main           ] ERROR BrokerService
>>> > - Failed to start ActiveMQ JMS Message Broker. Reason:
>>> > java.lang.ClassCastException:
>>> > org.apache.activemq.command.ActiveMQObjectMessage
>>> > java.lang.ClassCastException:
>>> > org.apache.activemq.command.ActiveMQObjectMessage
>>> >       at
>>> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
>>> >       at
>>> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
>>> >       at
>>> >
>>> org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
>>> >       at
>>> >
>>> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
>>> >       at
>>> >
>>> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
>>> >       at
>>> >
>>> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
>>> >       at
>>> >
>>> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
>>> >       at
>>> >
>>> org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
>>> >       at
>>> org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
>>> >       at
>>> >
>>> org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
>>> >       at
>>> >
>>> org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
>>> >       at
>>> >
>>> org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
>>> >       at
>>> >
>>> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
>>> >       at
>>> >
>>> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
>>> >       at
>>> >
>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>> >       at
>>> >
>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>> >       at
>>> >
>>> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
>>> >       at
>>> >
>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>> >       at
>>> >
>>> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
>>> >       at
>>> >
>>> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
>>> >       at
>>> >
>>> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
>>> >       at
>>> >
>>> org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
>>> >       at
>>> >
>>> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
>>> >       at
>>> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
>>> >       at
>>> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
>>> >       at
>>> >
>>> org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
>>> >       at
>>> org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
>>> >       at
>>> >
>>> org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
>>> >       at java.security.AccessController.doPrivileged(Native Method)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
>>> >       at
>>> >
>>> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
>>> >       at
>>> >
>>> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
>>> >       at
>>> >
>>> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
>>> >       at
>>> >
>>> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
>>> >       at
>>> >
>>> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
>>> >       at
>>> >
>>> org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
>>> >       at
>>> >
>>> org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
>>> >       at
>>> >
>>> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
>>> >       at
>>> >
>>> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
>>> >       at
>>> >
>>> org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
>>> >       at
>>> >
>>> org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
>>> >       at
>>> >
>>> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
>>> >       at
>>> >
>>> org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
>>> >       at
>>> >
>>> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
>>> >       at
>>> >
>>> org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
>>> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> >       at
>>> >
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>> >       at
>>> >
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>> >       at java.lang.reflect.Method.invoke(Method.java:585)
>>> >       at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
>>> >       at org.apache.activemq.console.Main.main(Main.java:106)
>>> > 2009-07-01 17:13:45,812 [main           ] INFO  BrokerService
>>> > - ActiveMQ Message Broker (localhost, null) is shutting down
>>> > 2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector
>>> > - Network Connector default-nc Stopped
>>> >
>>> > It's in Windows XP.
>>> >
>>> > Regards
>>> >
>>>
>>> --
>>> View this message in context:
>>> http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24369696.html
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>> --
>> http://blog.garytully.com
>>
>> Open Source Integration
>> http://fusesource.com
>>
>>
>
> --
> View this message in context: http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24407653.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>



--
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: ClassCastException having relation to expired messages

by Gary Tully :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

forget that, I just created the issue, please add to it if needed:
https://issues.apache.org/activemq/browse/AMQ-2322

2009/7/9 Gary Tully <gary.tully@...>:

> Can you create a jira issue [1] for this (include as much of this
> detail as possible) and I will try and get it into 5.3
>
> [1] https://issues.apache.org/activemq/secure/CreateIssue!default.jspa
>
> 2009/7/9 sic <sic_1234@...>:
>>
>> The problem still occured in the latest 5.3-SNAPSHOT.
>>
>> I make reference to sample of this URL
>> http://www.nabble.com/ActiveMQ-message-delivery-td18355245.html
>>
>> import javax.jms.Connection;
>> import javax.jms.ConnectionFactory;
>> import javax.jms.DeliveryMode;
>> import javax.jms.Destination;
>> import javax.jms.JMSException;
>> import javax.jms.MessageProducer;
>> import javax.jms.ObjectMessage;
>> import javax.jms.Session;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>>
>> import junit.framework.TestCase;
>>
>> /**
>>  * @author Marjan Sterjev
>>  *
>>  */
>> public class ProducerTester extends TestCase {
>>
>>        public void testSendQueue() throws JMSException, InterruptedException {
>>                ConnectionFactory connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61616");
>>                Connection connection = connectionFactory.createConnection();
>>                Session session = connection.createSession(false,
>>                        Session.AUTO_ACKNOWLEDGE);
>>                Destination destination = session.createQueue("test.queue");
>>                MessageProducer producer = session.createProducer(destination);
>>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>                producer.setTimeToLive(10000);
>>                int msgCount = 20;
>>                int sleep = 100;
>>                String payload = "Test Message";
>>                for (int i = 0; i < msgCount; i++) {
>>                        ObjectMessage message = session.createObjectMessage();
>>                        String p = String.format("%s:%d", payload, (i + 1));
>>                        message.setObject(p);
>>                        producer.send(message);
>>                        Thread.sleep(sleep);
>>                }
>>                producer.close();
>>                session.close();
>>                connection.close();
>>        }
>> }
>>
>> I have already known a workaround which works a periodic message expiry task
>> as you refer.
>>
>> However unusual case can occur as I mentioned.
>>
>> Test steps are followed :
>> start an ActiveMQ setting a vm queue cursor.
>> set a MessageConsumer to consume only those messages that had expired in
>> every 5 minutes.
>> send a message(set TimeToLive to 10 seconds) by junit test case
>> (ProducerTester class)
>> After between 10 seconds and 5 minutes(hope that message is expired while
>> MessageConsumer doesn't work yet), make server shutdown abruptly so that
>> messages which has been expired leaves on queue.
>> restart an ActiveMQ.
>> In this states, an ActiveMQ cannot be stated by ClassCastException.
>>
>>
>>
>> Gary Tully wrote:
>>>
>>> would it be possible to provide a junit test case that demonstrates this
>>> behavior? I wonder does the problem persist with the latest 5.3-SNAPSHOT?
>>>
>>> Trunk has a resolution to
>>> https://issues.apache.org/activemq/browse/AMQ-1112that provides a
>>> periodic message expiry task. That should help but it may be
>>> that there still remains some assumptions about the type of message.
>>> A junit test case would help lots.
>>>
>>>
>>>
>>> 2009/7/7 sic <sic_1234@...>
>>>
>>>>
>>>> While expired messages remain on queue, server cannot be re-started with
>>>> ClassCastException until I delete persistent file(e.g. file :
>>>> data\kr-store\data\hash-index-queue-data_queue#3a#2f#2fTEST.QUEUE,
>>>> QueueName
>>>> : TEST.QUEUE) of the queue which has expired messages
>>>>
>>>> Yet, this way has disadvantage that all of messages on specified queue
>>>> have
>>>> been purged regardless of expiration.
>>>>
>>>> Does anyone have other ways?
>>>>
>>>>
>>>> sic wrote:
>>>> >
>>>> > While testing ActiveMQ5.2.0, I have a question about expired message.
>>>> >
>>>> > Note that testing messages are Persistent and using a queue, set
>>>> message
>>>> > expiration time to 10 seconds. configurations are same except
>>>> destination
>>>> > policy(using a vm queue cursor) as below
>>>> > <destinationPolicy>
>>>> >       <policyMap>
>>>> >               <policyEntries>
>>>> >                       <policyEntry queue=">" memoryLimit="5mb">
>>>> >                               <dispatchPolicy>
>>>> >                                       <strictOrderDispatchPolicy />
>>>> >                               </dispatchPolicy>
>>>> >                               <deadLetterStrategy>
>>>> >                                       <individualDeadLetterStrategy
>>>> queuePrefix="DLQ."/>
>>>> >                               </deadLetterStrategy>
>>>> >                               <pendingQueuePolicy>
>>>> >                                       <vmQueueCursor />
>>>> >                               </pendingQueuePolicy>
>>>> >                       </policyEntry>
>>>> >               </policyEntries>
>>>> >       </policyMap>
>>>> > </destinationPolicy>
>>>> > Additionally, it turns out that the activemq broker actually does not
>>>> > proactively purge expired messages from queues. so we set
>>>> > a thread that periodically cleared my queues of expired messages by
>>>> help
>>>> > of this forum's advisor.
>>>> > We suppose that a server is stopped unexpectedly when expired message
>>>> > leaves on queue without disposed by the thread
>>>> > After recovering a server, we will expect that remained messages
>>>> including
>>>> > both normal and expired message are loaded normally
>>>> > However server stopped abnormally and some error occured while starting
>>>> > ActiveMQ. Errors are like that
>>>> >
>>>> > 2009-07-01 17:13:45,125 [main           ] INFO  BrokerService
>>>> > - For help or more information please see: http://activemq.apache.org/
>>>> > 2009-07-01 17:13:45,421 [main           ] INFO  KahaStore
>>>> > - Kaha Store using data directory
>>>> > D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
>>>> > 2009-07-01 17:13:45,796 [main           ] ERROR BrokerService
>>>> > - Failed to start ActiveMQ JMS Message Broker. Reason:
>>>> > java.lang.ClassCastException:
>>>> > org.apache.activemq.command.ActiveMQObjectMessage
>>>> > java.lang.ClassCastException:
>>>> > org.apache.activemq.command.ActiveMQObjectMessage
>>>> >       at
>>>> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
>>>> >       at
>>>> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
>>>> >       at
>>>> org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>>> >       at
>>>> >
>>>> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
>>>> >       at
>>>> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
>>>> >       at
>>>> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
>>>> >       at
>>>> org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
>>>> >       at
>>>> >
>>>> org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
>>>> >       at java.security.AccessController.doPrivileged(Native Method)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
>>>> >       at
>>>> >
>>>> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
>>>> >       at
>>>> >
>>>> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
>>>> >       at
>>>> >
>>>> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
>>>> >       at
>>>> >
>>>> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
>>>> >       at
>>>> >
>>>> org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
>>>> >       at
>>>> >
>>>> org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
>>>> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> >       at
>>>> >
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>>> >       at
>>>> >
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>> >       at java.lang.reflect.Method.invoke(Method.java:585)
>>>> >       at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
>>>> >       at org.apache.activemq.console.Main.main(Main.java:106)
>>>> > 2009-07-01 17:13:45,812 [main           ] INFO  BrokerService
>>>> > - ActiveMQ Message Broker (localhost, null) is shutting down
>>>> > 2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector
>>>> > - Network Connector default-nc Stopped
>>>> >
>>>> > It's in Windows XP.
>>>> >
>>>> > Regards
>>>> >
>>>>
>>>> --
>>>> View this message in context:
>>>> http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24369696.html
>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>> --
>>> http://blog.garytully.com
>>>
>>> Open Source Integration
>>> http://fusesource.com
>>>
>>>
>>
>> --
>> View this message in context: http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24407653.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
>
>
>
> --
> http://blog.garytully.com
>
> Open Source Integration
> http://fusesource.com
>



--
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: ClassCastException having relation to expired messages

by sic :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

That's enough to demonstrate this situation, I think.

Thanks for your concern.

Gary Tully wrote:
forget that, I just created the issue, please add to it if needed:
https://issues.apache.org/activemq/browse/AMQ-2322

2009/7/9 Gary Tully <gary.tully@gmail.com>:
> Can you create a jira issue [1] for this (include as much of this
> detail as possible) and I will try and get it into 5.3
>
> [1] https://issues.apache.org/activemq/secure/CreateIssue!default.jspa
>
> 2009/7/9 sic <sic_1234@naver.com>:
>>
>> The problem still occured in the latest 5.3-SNAPSHOT.
>>
>> I make reference to sample of this URL
>> http://www.nabble.com/ActiveMQ-message-delivery-td18355245.html
>>
>> import javax.jms.Connection;
>> import javax.jms.ConnectionFactory;
>> import javax.jms.DeliveryMode;
>> import javax.jms.Destination;
>> import javax.jms.JMSException;
>> import javax.jms.MessageProducer;
>> import javax.jms.ObjectMessage;
>> import javax.jms.Session;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>>
>> import junit.framework.TestCase;
>>
>> /**
>>  * @author Marjan Sterjev
>>  *
>>  */
>> public class ProducerTester extends TestCase {
>>
>>        public void testSendQueue() throws JMSException, InterruptedException {
>>                ConnectionFactory connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61616");
>>                Connection connection = connectionFactory.createConnection();
>>                Session session = connection.createSession(false,
>>                        Session.AUTO_ACKNOWLEDGE);
>>                Destination destination = session.createQueue("test.queue");
>>                MessageProducer producer = session.createProducer(destination);
>>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>                producer.setTimeToLive(10000);
>>                int msgCount = 20;
>>                int sleep = 100;
>>                String payload = "Test Message";
>>                for (int i = 0; i < msgCount; i++) {
>>                        ObjectMessage message = session.createObjectMessage();
>>                        String p = String.format("%s:%d", payload, (i + 1));
>>                        message.setObject(p);
>>                        producer.send(message);
>>                        Thread.sleep(sleep);
>>                }
>>                producer.close();
>>                session.close();
>>                connection.close();
>>        }
>> }
>>
>> I have already known a workaround which works a periodic message expiry task
>> as you refer.
>>
>> However unusual case can occur as I mentioned.
>>
>> Test steps are followed :
>> start an ActiveMQ setting a vm queue cursor.
>> set a MessageConsumer to consume only those messages that had expired in
>> every 5 minutes.
>> send a message(set TimeToLive to 10 seconds) by junit test case
>> (ProducerTester class)
>> After between 10 seconds and 5 minutes(hope that message is expired while
>> MessageConsumer doesn't work yet), make server shutdown abruptly so that
>> messages which has been expired leaves on queue.
>> restart an ActiveMQ.
>> In this states, an ActiveMQ cannot be stated by ClassCastException.
>>
>>
>>
>> Gary Tully wrote:
>>>
>>> would it be possible to provide a junit test case that demonstrates this
>>> behavior? I wonder does the problem persist with the latest 5.3-SNAPSHOT?
>>>
>>> Trunk has a resolution to
>>> https://issues.apache.org/activemq/browse/AMQ-1112that provides a
>>> periodic message expiry task. That should help but it may be
>>> that there still remains some assumptions about the type of message.
>>> A junit test case would help lots.
>>>
>>>
>>>
>>> 2009/7/7 sic <sic_1234@naver.com>
>>>
>>>>
>>>> While expired messages remain on queue, server cannot be re-started with
>>>> ClassCastException until I delete persistent file(e.g. file :
>>>> data\kr-store\data\hash-index-queue-data_queue#3a#2f#2fTEST.QUEUE,
>>>> QueueName
>>>> : TEST.QUEUE) of the queue which has expired messages
>>>>
>>>> Yet, this way has disadvantage that all of messages on specified queue
>>>> have
>>>> been purged regardless of expiration.
>>>>
>>>> Does anyone have other ways?
>>>>
>>>>
>>>> sic wrote:
>>>> >
>>>> > While testing ActiveMQ5.2.0, I have a question about expired message.
>>>> >
>>>> > Note that testing messages are Persistent and using a queue, set
>>>> message
>>>> > expiration time to 10 seconds. configurations are same except
>>>> destination
>>>> > policy(using a vm queue cursor) as below
>>>> > <destinationPolicy>
>>>> >       <policyMap>
>>>> >               <policyEntries>
>>>> >                       <policyEntry queue=">" memoryLimit="5mb">
>>>> >                               <dispatchPolicy>
>>>> >                                       <strictOrderDispatchPolicy />
>>>> >                               </dispatchPolicy>
>>>> >                               <deadLetterStrategy>
>>>> >                                       <individualDeadLetterStrategy
>>>> queuePrefix="DLQ."/>
>>>> >                               </deadLetterStrategy>
>>>> >                               <pendingQueuePolicy>
>>>> >                                       <vmQueueCursor />
>>>> >                               </pendingQueuePolicy>
>>>> >                       </policyEntry>
>>>> >               </policyEntries>
>>>> >       </policyMap>
>>>> > </destinationPolicy>
>>>> > Additionally, it turns out that the activemq broker actually does not
>>>> > proactively purge expired messages from queues. so we set
>>>> > a thread that periodically cleared my queues of expired messages by
>>>> help
>>>> > of this forum's advisor.
>>>> > We suppose that a server is stopped unexpectedly when expired message
>>>> > leaves on queue without disposed by the thread
>>>> > After recovering a server, we will expect that remained messages
>>>> including
>>>> > both normal and expired message are loaded normally
>>>> > However server stopped abnormally and some error occured while starting
>>>> > ActiveMQ. Errors are like that
>>>> >
>>>> > 2009-07-01 17:13:45,125 [main           ] INFO  BrokerService
>>>> > - For help or more information please see: http://activemq.apache.org/
>>>> > 2009-07-01 17:13:45,421 [main           ] INFO  KahaStore
>>>> > - Kaha Store using data directory
>>>> > D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
>>>> > 2009-07-01 17:13:45,796 [main           ] ERROR BrokerService
>>>> > - Failed to start ActiveMQ JMS Message Broker. Reason:
>>>> > java.lang.ClassCastException:
>>>> > org.apache.activemq.command.ActiveMQObjectMessage
>>>> > java.lang.ClassCastException:
>>>> > org.apache.activemq.command.ActiveMQObjectMessage
>>>> >       at
>>>> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
>>>> >       at
>>>> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
>>>> >       at
>>>> >
>>>> org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
>>>> >       at
>>>> org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>>> >       at
>>>> >
>>>> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
>>>> >       at
>>>> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
>>>> >       at
>>>> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
>>>> >       at
>>>> org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
>>>> >       at
>>>> >
>>>> org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
>>>> >       at java.security.AccessController.doPrivileged(Native Method)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
>>>> >       at
>>>> >
>>>> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
>>>> >       at
>>>> >
>>>> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
>>>> >       at
>>>> >
>>>> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
>>>> >       at
>>>> >
>>>> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
>>>> >       at
>>>> >
>>>> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
>>>> >       at
>>>> >
>>>> org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
>>>> >       at
>>>> >
>>>> org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
>>>> >       at
>>>> >
>>>> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
>>>> >       at
>>>> >
>>>> org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
>>>> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> >       at
>>>> >
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>>> >       at
>>>> >
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>> >       at java.lang.reflect.Method.invoke(Method.java:585)
>>>> >       at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
>>>> >       at org.apache.activemq.console.Main.main(Main.java:106)
>>>> > 2009-07-01 17:13:45,812 [main           ] INFO  BrokerService
>>>> > - ActiveMQ Message Broker (localhost, null) is shutting down
>>>> > 2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector
>>>> > - Network Connector default-nc Stopped
>>>> >
>>>> > It's in Windows XP.
>>>> >
>>>> > Regards
>>>> >
>>>>
>>>> --
>>>> View this message in context:
>>>> http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24369696.html
>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>> --
>>> http://blog.garytully.com
>>>
>>> Open Source Integration
>>> http://fusesource.com
>>>
>>>
>>
>> --
>> View this message in context: http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24407653.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
>
>
>
> --
> http://blog.garytully.com
>
> Open Source Integration
> http://fusesource.com
>



--
http://blog.garytully.com

Open Source Integration
http://fusesource.com