explicitly setting replyTo doesn't scale with .threads() and JMS cache.

View: New views
3 Messages — Rating Filter:   Alert me  

explicitly setting replyTo doesn't scale with .threads() and JMS cache.

by Eric Bouer :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hello.
In order to optimize my InOut throughput I added explicit replyTo  on my JMS endpoint.
This doesn't seems to be well behaving with camel threads and spring JMS caching.
The following test case shows the problem.
remove the explicit replyTo and everything works fine.
your comments are welcome.

================

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import javax.jms.TextMessage;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.CamelContext;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.builder.RouteBuilder;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.ExchangePattern;
import org.apache.camel.test.CamelTestSupport;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;

/**
 * Unit test using a fixed replyTo specified on the JMS endpoint
 *
 * @version $Revision: 791824 $
 */
public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {

    private ActiveMQComponent amq;
    private static String MQURI = "failover:(tcp://localhost:61616)";
   // private static String MQURI = "vm://localhost?broker.persistent=false&broker.useJmx=false";

    public class Replier implements Callable {

        @Override
                public Object call() throws Exception {
                log.info("replier started");
                JmsTemplate jms = new JmsTemplate(amq.getConfiguration().getConnectionFactory());

                final TextMessage msg = (TextMessage) jms.receive("nameRequestor");
                assertEquals("What's your name", msg.getText());

                // there should be a JMSReplyTo so we know where to send the reply
                final Destination replyTo = msg.getJMSReplyTo();

                // send reply
               // Thread.sleep(10000);
                jms.send(replyTo, new MessageCreator() {
                    public Message createMessage(Session session) throws JMSException {
                        TextMessage replyMsg = session.createTextMessage();
                        replyMsg.setText("My name is Arnio");
                        replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
                        return replyMsg;
                    }
                });
                return null;
            }
    };
    public void testCustomJMSReplyToInOut() throws Exception {

        MockEndpoint mock = getMockEndpoint("mock:result");
        mock.expectedBodiesReceived("My name is Arnio");
        mock.expectedMessageCount(10);
        // do not use Camel to send and receive to simulate a non Camel client
        // use another thread to listen and send the reply
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);

        for (int i = 0 ; i < 10 ; i++) {
          newFixedThreadPool.submit(new Replier());
        }
       
        // now get started and send the first message that gets the ball rolling
        JmsTemplate jms = new JmsTemplate(amq.getConfiguration().getConnectionFactory());
        for (int i = 0 ; i < 10 ; i++) {
        jms.send("hello", new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                msg.setText("Hello, I'm here");
                return msg;
            }

        });

        log.info("Hello sent");
        Thread.sleep(10);
        }

        Thread.sleep(5000);
        assertMockEndpointsSatisfied();
    }

    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {

            public void configure() throws Exception {
                from("activemq:queue:hello")
                   .threads()
                    .process(new Processor() {
                        public void process(Exchange exchange) throws Exception {

                            exchange.getOut().setBody("What's your name");
                        }
                    })
                    // use in out to get a reply as well
                   
                    .to(ExchangePattern.InOut, "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the replyTo and eveything works well
                    .to("direct:replyprocessor");

                    from("direct:replyprocessor").process(new Processor() {
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("Here I am processing the reply " + exchange.getIn().getBody());
                            }
                    }).to("mock:result");
            }
        };
    }

    protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
        amq = activeMQComponent(MQURI);
        ActiveMQConnectionFactory targetFactory = new ActiveMQConnectionFactory(MQURI);
         log.info("Using MQ CachingConnectionFactory");
            CachingConnectionFactory cachedAMQConnectionFactory = new CachingConnectionFactory(targetFactory);

        camelContext.addComponent("activemq", ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));

        return camelContext;
    }


}

===========

Re: explicitly setting replyTo doesn't scale with .threads() and JMS cache.

by Claus Ibsen-2 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi

Maybe this ticket can give some hints
https://issues.apache.org/activemq/browse/CAMEL-490

On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <ericbouer@...> wrote:

>
> Hello.
> In order to optimize my InOut throughput I added explicit replyTo  on my JMS
> endpoint.
> This doesn't seems to be well behaving with camel threads and spring JMS
> caching.
> The following test case shows the problem.
> remove the explicit replyTo and everything works fine.
> your comments are welcome.
>
> ================
>
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.Callable;
> import javax.jms.TextMessage;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.Session;
> import javax.jms.JMSException;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.CamelContext;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.ExchangePattern;
> import org.apache.camel.test.CamelTestSupport;
> import org.springframework.jms.connection.CachingConnectionFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessageCreator;
> import static
> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
>
> /**
>  * Unit test using a fixed replyTo specified on the JMS endpoint
>  *
>  * @version $Revision: 791824 $
>  */
> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {
>
>    private ActiveMQComponent amq;
>    private static String MQURI = "failover:(tcp://localhost:61616)";
>   // private static String MQURI =
> "vm://localhost?broker.persistent=false&broker.useJmx=false";
>
>    public class Replier implements Callable {
>
>        @Override
>                public Object call() throws Exception {
>                log.info("replier started");
>                JmsTemplate jms = new
> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>
>                final TextMessage msg = (TextMessage)
> jms.receive("nameRequestor");
>                assertEquals("What's your name", msg.getText());
>
>                // there should be a JMSReplyTo so we know where to send the
> reply
>                final Destination replyTo = msg.getJMSReplyTo();
>
>                // send reply
>               // Thread.sleep(10000);
>                jms.send(replyTo, new MessageCreator() {
>                    public Message createMessage(Session session) throws
> JMSException {
>                        TextMessage replyMsg = session.createTextMessage();
>                        replyMsg.setText("My name is Arnio");
>
> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
>                        return replyMsg;
>                    }
>                });
>                return null;
>            }
>    };
>    public void testCustomJMSReplyToInOut() throws Exception {
>
>        MockEndpoint mock = getMockEndpoint("mock:result");
>        mock.expectedBodiesReceived("My name is Arnio");
>        mock.expectedMessageCount(10);
>        // do not use Camel to send and receive to simulate a non Camel
> client
>        // use another thread to listen and send the reply
>        ExecutorService newFixedThreadPool =
> Executors.newFixedThreadPool(20);
>
>        for (int i = 0 ; i < 10 ; i++) {
>          newFixedThreadPool.submit(new Replier());
>        }
>
>        // now get started and send the first message that gets the ball
> rolling
>        JmsTemplate jms = new
> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>        for (int i = 0 ; i < 10 ; i++) {
>        jms.send("hello", new MessageCreator() {
>            public Message createMessage(Session session) throws
> JMSException {
>                TextMessage msg = session.createTextMessage();
>                msg.setText("Hello, I'm here");
>                return msg;
>            }
>
>        });
>
>        log.info("Hello sent");
>        Thread.sleep(10);
>        }
>
>        Thread.sleep(5000);
>        assertMockEndpointsSatisfied();
>    }
>
>    protected RouteBuilder createRouteBuilder() throws Exception {
>        return new RouteBuilder() {
>
>            public void configure() throws Exception {
>                from("activemq:queue:hello")
>                   .threads()
>                    .process(new Processor() {
>                        public void process(Exchange exchange) throws
> Exception {
>
>                            exchange.getOut().setBody("What's your name");
>                        }
>                    })
>                    // use in out to get a reply as well
>
>                    .to(ExchangePattern.InOut,
> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
> replyTo and eveything works well
>                    .to("direct:replyprocessor");
>
>                    from("direct:replyprocessor").process(new Processor() {
>                            public void process(Exchange exchange) throws
> Exception {
>                                System.out.println("Here I am processing the
> reply " + exchange.getIn().getBody());
>                            }
>                    }).to("mock:result");
>            }
>        };
>    }
>
>    protected CamelContext createCamelContext() throws Exception {
>         CamelContext camelContext = super.createCamelContext();
>        amq = activeMQComponent(MQURI);
>        ActiveMQConnectionFactory targetFactory = new
> ActiveMQConnectionFactory(MQURI);
>         log.info("Using MQ CachingConnectionFactory");
>            CachingConnectionFactory cachedAMQConnectionFactory = new
> CachingConnectionFactory(targetFactory);
>
>        camelContext.addComponent("activemq",
> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));
>
>        return camelContext;
>    }
>
>
> }
>
> ===========
> --
> View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.
>
>



--
Claus Ibsen
Apache Camel Committer

Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: explicitly setting replyTo doesn't scale with .threads() and JMS cache.

by Claus Ibsen-2 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi

Also use option concurrentConsumers on the JMS endpoint to support
concurrency, instead of threads.


On Wed, Oct 21, 2009 at 2:58 PM, Claus Ibsen <claus.ibsen@...> wrote:

> Hi
>
> Maybe this ticket can give some hints
> https://issues.apache.org/activemq/browse/CAMEL-490
>
> On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <ericbouer@...> wrote:
>>
>> Hello.
>> In order to optimize my InOut throughput I added explicit replyTo  on my JMS
>> endpoint.
>> This doesn't seems to be well behaving with camel threads and spring JMS
>> caching.
>> The following test case shows the problem.
>> remove the explicit replyTo and everything works fine.
>> your comments are welcome.
>>
>> ================
>>
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors;
>> import java.util.concurrent.Callable;
>> import javax.jms.TextMessage;
>> import javax.jms.Destination;
>> import javax.jms.Message;
>> import javax.jms.Session;
>> import javax.jms.JMSException;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.ExchangePattern;
>> import org.apache.camel.test.CamelTestSupport;
>> import org.springframework.jms.connection.CachingConnectionFactory;
>> import org.springframework.jms.core.JmsTemplate;
>> import org.springframework.jms.core.MessageCreator;
>> import static
>> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
>>
>> /**
>>  * Unit test using a fixed replyTo specified on the JMS endpoint
>>  *
>>  * @version $Revision: 791824 $
>>  */
>> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {
>>
>>    private ActiveMQComponent amq;
>>    private static String MQURI = "failover:(tcp://localhost:61616)";
>>   // private static String MQURI =
>> "vm://localhost?broker.persistent=false&broker.useJmx=false";
>>
>>    public class Replier implements Callable {
>>
>>        @Override
>>                public Object call() throws Exception {
>>                log.info("replier started");
>>                JmsTemplate jms = new
>> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>>
>>                final TextMessage msg = (TextMessage)
>> jms.receive("nameRequestor");
>>                assertEquals("What's your name", msg.getText());
>>
>>                // there should be a JMSReplyTo so we know where to send the
>> reply
>>                final Destination replyTo = msg.getJMSReplyTo();
>>
>>                // send reply
>>               // Thread.sleep(10000);
>>                jms.send(replyTo, new MessageCreator() {
>>                    public Message createMessage(Session session) throws
>> JMSException {
>>                        TextMessage replyMsg = session.createTextMessage();
>>                        replyMsg.setText("My name is Arnio");
>>
>> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
>>                        return replyMsg;
>>                    }
>>                });
>>                return null;
>>            }
>>    };
>>    public void testCustomJMSReplyToInOut() throws Exception {
>>
>>        MockEndpoint mock = getMockEndpoint("mock:result");
>>        mock.expectedBodiesReceived("My name is Arnio");
>>        mock.expectedMessageCount(10);
>>        // do not use Camel to send and receive to simulate a non Camel
>> client
>>        // use another thread to listen and send the reply
>>        ExecutorService newFixedThreadPool =
>> Executors.newFixedThreadPool(20);
>>
>>        for (int i = 0 ; i < 10 ; i++) {
>>          newFixedThreadPool.submit(new Replier());
>>        }
>>
>>        // now get started and send the first message that gets the ball
>> rolling
>>        JmsTemplate jms = new
>> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>>        for (int i = 0 ; i < 10 ; i++) {
>>        jms.send("hello", new MessageCreator() {
>>            public Message createMessage(Session session) throws
>> JMSException {
>>                TextMessage msg = session.createTextMessage();
>>                msg.setText("Hello, I'm here");
>>                return msg;
>>            }
>>
>>        });
>>
>>        log.info("Hello sent");
>>        Thread.sleep(10);
>>        }
>>
>>        Thread.sleep(5000);
>>        assertMockEndpointsSatisfied();
>>    }
>>
>>    protected RouteBuilder createRouteBuilder() throws Exception {
>>        return new RouteBuilder() {
>>
>>            public void configure() throws Exception {
>>                from("activemq:queue:hello")
>>                   .threads()
>>                    .process(new Processor() {
>>                        public void process(Exchange exchange) throws
>> Exception {
>>
>>                            exchange.getOut().setBody("What's your name");
>>                        }
>>                    })
>>                    // use in out to get a reply as well
>>
>>                    .to(ExchangePattern.InOut,
>> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
>> replyTo and eveything works well
>>                    .to("direct:replyprocessor");
>>
>>                    from("direct:replyprocessor").process(new Processor() {
>>                            public void process(Exchange exchange) throws
>> Exception {
>>                                System.out.println("Here I am processing the
>> reply " + exchange.getIn().getBody());
>>                            }
>>                    }).to("mock:result");
>>            }
>>        };
>>    }
>>
>>    protected CamelContext createCamelContext() throws Exception {
>>         CamelContext camelContext = super.createCamelContext();
>>        amq = activeMQComponent(MQURI);
>>        ActiveMQConnectionFactory targetFactory = new
>> ActiveMQConnectionFactory(MQURI);
>>         log.info("Using MQ CachingConnectionFactory");
>>            CachingConnectionFactory cachedAMQConnectionFactory = new
>> CachingConnectionFactory(targetFactory);
>>
>>        camelContext.addComponent("activemq",
>> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));
>>
>>        return camelContext;
>>    }
>>
>>
>> }
>>
>> ===========
>> --
>> View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
>> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.
>>
>>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>



--
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus