package foo; import java.util.Enumeration; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.resource.spi.work.Work; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.mule.impl.MuleMessage; import org.mule.providers.PollingMessageReceiver; import org.mule.providers.jms.JmsConnector; import org.mule.providers.jms.JmsSupport; import org.mule.transaction.TransactionCallback; import org.mule.transaction.TransactionCoordination; import org.mule.transaction.TransactionTemplate; import org.mule.umo.UMOComponent; import org.mule.umo.UMOTransaction; import org.mule.umo.endpoint.UMOEndpoint; import org.mule.umo.lifecycle.InitialisationException; import org.mule.umo.provider.UMOConnector; import org.mule.umo.provider.UMOMessageAdapter; public class BrowsingMessageReceiver extends PollingMessageReceiver { static Log log = LogFactory.getLog(BrowsingMessageReceiver.class); protected ThreadContextLocal context = new ThreadContextLocal(); protected JmsConnector connector; private static long magicNumber = 23; public BrowsingMessageReceiver( UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) throws InitialisationException { super(connector, component, endpoint, magicNumber); this.connector = (JmsConnector) connector; } /** * Holder receiving the session and consumer for this thread. */ protected static class JmsThreadContext { public QueueBrowser browser; public Object session; } /** * Strongly typed ThreadLocal for ThreadContext. */ protected static class ThreadContextLocal extends ThreadLocal { public JmsThreadContext getContext() { return (JmsThreadContext) get(); } protected Object initialValue() { return new JmsThreadContext(); } } @Override public void run() { try { Thread.sleep(STARTUP_DELAY); while (true) { try { if(connected.get()) { poll(); } else { synchronized(this) { if (log.isDebugEnabled()) log.debug("Waiting to be notified about a reconnection........"); wait(); if (log.isDebugEnabled()) log.debug("Received notification about a reconnection"); } } } catch (InterruptedException e) { return; } catch (Exception e) { handleException(e); } // Sleep for a while, spin around and rebrowse Thread.sleep(100); } } catch (InterruptedException e) { } } @Override public void poll() throws Exception { JmsThreadContext ctx = context.getContext(); UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); if (tx != null) { tx.bindResource(connector.getConnection(), ctx.session); } if (ctx.browser == null) createBrowser(); Enumeration e = ctx.browser.getEnumeration(); if (null != e) { while (e.hasMoreElements()) { log.debug("Queue browsing in loop"); // Non-blocking submission to the WorkManager to get the worker thread // to try to consume a message from the queue in a non-blocking fashion getWorkManager().scheduleWork(new NonBlockingQueueMessageConsumer()); // This is a NOOP e.nextElement(); } } } private void createBrowser() throws Exception { JmsThreadContext ctx = context.getContext(); Session session = connector.getSession(endpoint); JmsSupport jmsSupport = connector.getJmsSupport(); Queue q = (Queue) jmsSupport.createDestination(session, endpoint.getEndpointURI().getAddress(), false); ctx.browser = session.createBrowser(q); } protected class NonBlockingQueueMessageConsumer implements Work { public void release() { // NOOP } public void run() { try { dequeue(); } catch (Exception e) { // TODO Auto-generated catch block log.error(e); } } public void dequeue() throws Exception { TransactionTemplate tt = new TransactionTemplate(endpoint.getTransactionConfig(), connector.getExceptionListener()); TransactionCallback cb = new TransactionCallback() { public Object doInTransaction() throws Exception { Session session = connector.getSession(endpoint); JmsSupport jmsSupport = connector.getJmsSupport(); Queue q = (Queue) jmsSupport.createDestination(session, endpoint.getEndpointURI().getAddress(), false); MessageConsumer consumer = jmsSupport.createConsumer(session, q, false); Message message = consumer.receiveNoWait(); if (message != null) { // Do something with the message UMOMessageAdapter adapter = connector.getMessageAdapter(message); routeMessage(new MuleMessage(adapter)); } else { UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); tx.setRollbackOnly(); log.debug("This thread consumed an empty message, rolling transaction back"); } return null; } }; tt.execute(cb); } } /** * Connects the consumer. */ @Override public void connect() throws Exception { doConnect(); } /** * Disconnects the consumer. */ @Override public synchronized void doConnect() throws Exception { if (!component.isStarted()) { log.info("Connect called for a component that wasn't started"); return; } if (!connected.get()) { connected.set(true); log.debug("Notifying waiting threads about reconnection"); notify(); } else { connected.set(false); } } /** * Connects the consumer. */ @Override public void disconnect() throws Exception { doDisconnect(); } /** * Disconnects the consumer. */ @Override public synchronized void doDisconnect() throws Exception { if (connected.get()) { connected.set(false); } } }