Consumer Listener stop receving message until ActiveMQ restart

View: New views
1 Messages — Rating Filter:   Alert me  

Consumer Listener stop receving message until ActiveMQ restart

by Edward Ye :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,All.
I am facing a simliar problem like this thread(http://www.nabble.com/Pending-Messages-are-shown-in-ActiveMQ-to20241332.html) but in a different environment.

My PC:
ActiveMQ 5.1.0
Java SE 6 Update 10
Windows 2003 SP2
client api: activemq-cpp-2.2.1

Development tool:
Visual Studio 2005

I have a simple test:
A producer sent  thousands of  persistent messages (each 1k bytes )into a queue in async way. and then stopped.
A consumer received message with listener start and tried to run up of the pending messages.

Sometimes the consumer stopped to received even there were still many messages pending in the queue. I have tried to start the client and reconnect to the queue successfully. But the messages still remained in the queue (from webconsole I can see)and the onMessage handler was never called until the ActiveMQ was restarted.

Just like the above thread saying, sometimes the Number Of Pending Message in webconsole is negative and not proper. If I restart ActiveMQ , the number become correct.

here is my code of producer:
ActiveMQMessageProducer is a producer wrapper class.
int ActiveMQMessageProducer::connect(char* cbrokerURI,char* cqueueName)
{
        ConnectionFactory* connectionFactory = NULL;

        try
        {
                string brokerURI=cbrokerURI;
                string queueName=cqueueName;
               
                // Create a ConnectionFactory
                connectionFactory = ConnectionFactory::createCMSConnectionFactory( brokerURI);//brokerURI must contain UseAsyncSend=true

                // Create a Connection
                         connection = connectionFactory->createConnection();
                         ((Connection*)connection)->start();

                // free the factory, we are done with it.
                         delete connectionFactory;
                         connectionFactory = NULL;

                // Create an Auto_Acknowledge Session
                session = ((Connection*)connection)->createSession( Session::AUTO_ACKNOWLEDGE );

                destination =((Session*) session)->createQueue( queueName );

                // Create a MessageProducer from the Session to the Topic or Queue
                         producer = ((Session*)session)->createProducer( (Destination*)destination );
                         ((MessageProducer*)producer)->setDeliveryMode( DeliveryMode::PERSISTENT );
        }
        catch( CMSException& e )
        {
                         if (connectionFactory)
                        delete connectionFactory;
                         connectionFactory = NULL;

               
                if (logCallbackFunc)//a function pointer
                {
                        logCallbackFunc((char*)e.getMessage().c_str());
                }

                return  -1;
        }
        return 0;

}

int ActiveMQMessageProducer::sendMessage(char *message,size_t sizeOfMessage)
{
        BytesMessage* bytesMessage=NULL;
        try
        {
                bytesMessage=((Session*)session)->createBytesMessage((unsigned char*)message,sizeOfMessage);
                ((MessageProducer*)producer)->send(bytesMessage);

        }
        catch(CMSException& e)
        {
               
                if (logCallbackFunc)
                {
                        logCallbackFunc((char*)(e.getMessage().c_str()));
                }
                if (bytesMessage)
                        delete bytesMessage;

                bytesMessage=NULL;

                return  -1;
        }

        if (bytesMessage)
                delete bytesMessage;

        bytesMessage=NULL;

        return 0;
}

And below is code for consumer:
ActiveMQConsumer is a wrapper class for consumer

int ActiveMQMessageConsumer::connect(char *cbrokerURI,char *cqueueName)
{
        ConnectionFactory* connectionFactory = NULL;

        try
        {
                string brokerURI=cbrokerURI;
                string queueName=cqueueName;
               
                // Create a ConnectionFactory
                connectionFactory = ConnectionFactory::createCMSConnectionFactory( brokerURI);

                // Create a Connection
                         connection = connectionFactory->createConnection();
                        ((Connection*)connection)->start();

                ((Connection*)connection)->setExceptionListener(((ConsumerListener*)listener));

                // free the factory, we are done with it.
                        delete connectionFactory;
                        connectionFactory = NULL;

                // Create an Auto_Acknowledge Session
                session = ((Connection*)connection)->createSession( Session::AUTO_ACKNOWLEDGE );

                destination =((Session*) session)->createQueue( queueName );

                // Create a MessageProducer from the Session to the Topic or Queue        
                         consumer = ((Session*) session)->createConsumer( ((Destination*)destination ));
                         ((MessageConsumer*)consumer)->setMessageListener(((ConsumerListener*)listener));
        }
        catch( CMSException& e )
        {
                        if (connectionFactory)
                        delete connectionFactory;
                        connectionFactory = NULL;

               
                if (logCallbackFunc)
                {
                        logCallbackFunc((char*)e.getMessage().c_str());
                }

                return  -1;
        }
        return 0;

}

ConsumerListener is implementation for MessageListener and ExceptionListener

class ConsumerListener: public ExceptionListener,
                public MessageListener
{
private:
        ActiveMQMessageConsumer* consumer;

public:
        ConsumerListener(ActiveMQMessageConsumer* messageConsumer)
        {
                consumer=messageConsumer;
        }

        virtual ~ConsumerListener()
        {
                consumer=NULL;
        }

        // Called from the consumer since this class is a registered MessageListener.
    virtual void onMessage( const Message* message )
        {
               
                try
                {
                        const BytesMessage* bytesMessage =
                dynamic_cast< const BytesMessage* >( message );
                       
                        char* tempCharArray=new char(bytesMessage->getBodyLength());
                        memcpy(tempCharArray,(char*)(bytesMessage->getBodyBytes()),bytesMessage->getBodyLength());
                                       delete []tempCharArray;
                                       tempCharArray=NULL;
                               
                       
                }
                catch(CMSException& e)
                {
                        // do some stuff
                }

                return ;
        }

        // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException( const CMSException& ex )
        {
                try
                {
                        //do some error handler stuff
                       
                }
                catch(CMSException& e)
                {
                        //
                }
                return ;

    }
};

Anyone could help ?
Thanks a lot.