|
View:
New views
3 Messages
—
Rating Filter:
Alert me
|
|
|
explicitly setting replyTo doesn't scale with .threads() and JMS cache.Hello.
In order to optimize my InOut throughput I added explicit replyTo on my JMS endpoint. This doesn't seems to be well behaving with camel threads and spring JMS caching. The following test case shows the problem. remove the explicit replyTo and everything works fine. your comments are welcome. ================ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Callable; import javax.jms.TextMessage; import javax.jms.Destination; import javax.jms.Message; import javax.jms.Session; import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.CamelContext; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.builder.RouteBuilder; import org.apache.activemq.camel.component.ActiveMQComponent; import org.apache.camel.ExchangePattern; import org.apache.camel.test.CamelTestSupport; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; /** * Unit test using a fixed replyTo specified on the JMS endpoint * * @version $Revision: 791824 $ */ public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport { private ActiveMQComponent amq; private static String MQURI = "failover:(tcp://localhost:61616)"; // private static String MQURI = "vm://localhost?broker.persistent=false&broker.useJmx=false"; public class Replier implements Callable { @Override public Object call() throws Exception { log.info("replier started"); JmsTemplate jms = new JmsTemplate(amq.getConfiguration().getConnectionFactory()); final TextMessage msg = (TextMessage) jms.receive("nameRequestor"); assertEquals("What's your name", msg.getText()); // there should be a JMSReplyTo so we know where to send the reply final Destination replyTo = msg.getJMSReplyTo(); // send reply // Thread.sleep(10000); jms.send(replyTo, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage replyMsg = session.createTextMessage(); replyMsg.setText("My name is Arnio"); replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID()); return replyMsg; } }); return null; } }; public void testCustomJMSReplyToInOut() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("My name is Arnio"); mock.expectedMessageCount(10); // do not use Camel to send and receive to simulate a non Camel client // use another thread to listen and send the reply ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20); for (int i = 0 ; i < 10 ; i++) { newFixedThreadPool.submit(new Replier()); } // now get started and send the first message that gets the ball rolling JmsTemplate jms = new JmsTemplate(amq.getConfiguration().getConnectionFactory()); for (int i = 0 ; i < 10 ; i++) { jms.send("hello", new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); msg.setText("Hello, I'm here"); return msg; } }); log.info("Hello sent"); Thread.sleep(10); } Thread.sleep(5000); assertMockEndpointsSatisfied(); } protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("activemq:queue:hello") .threads() .process(new Processor() { public void process(Exchange exchange) throws Exception { exchange.getOut().setBody("What's your name"); } }) // use in out to get a reply as well .to(ExchangePattern.InOut, "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the replyTo and eveything works well .to("direct:replyprocessor"); from("direct:replyprocessor").process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println("Here I am processing the reply " + exchange.getIn().getBody()); } }).to("mock:result"); } }; } protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); amq = activeMQComponent(MQURI); ActiveMQConnectionFactory targetFactory = new ActiveMQConnectionFactory(MQURI); log.info("Using MQ CachingConnectionFactory"); CachingConnectionFactory cachedAMQConnectionFactory = new CachingConnectionFactory(targetFactory); camelContext.addComponent("activemq", ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory)); return camelContext; } } =========== |
|
|
Re: explicitly setting replyTo doesn't scale with .threads() and JMS cache.Hi
Maybe this ticket can give some hints https://issues.apache.org/activemq/browse/CAMEL-490 On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <ericbouer@...> wrote: > > Hello. > In order to optimize my InOut throughput I added explicit replyTo on my JMS > endpoint. > This doesn't seems to be well behaving with camel threads and spring JMS > caching. > The following test case shows the problem. > remove the explicit replyTo and everything works fine. > your comments are welcome. > > ================ > > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > import java.util.concurrent.Callable; > import javax.jms.TextMessage; > import javax.jms.Destination; > import javax.jms.Message; > import javax.jms.Session; > import javax.jms.JMSException; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.CamelContext; > import org.apache.camel.component.mock.MockEndpoint; > import org.apache.camel.builder.RouteBuilder; > import org.apache.activemq.camel.component.ActiveMQComponent; > import org.apache.camel.ExchangePattern; > import org.apache.camel.test.CamelTestSupport; > import org.springframework.jms.connection.CachingConnectionFactory; > import org.springframework.jms.core.JmsTemplate; > import org.springframework.jms.core.MessageCreator; > import static > org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; > > /** > * Unit test using a fixed replyTo specified on the JMS endpoint > * > * @version $Revision: 791824 $ > */ > public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport { > > private ActiveMQComponent amq; > private static String MQURI = "failover:(tcp://localhost:61616)"; > // private static String MQURI = > "vm://localhost?broker.persistent=false&broker.useJmx=false"; > > public class Replier implements Callable { > > @Override > public Object call() throws Exception { > log.info("replier started"); > JmsTemplate jms = new > JmsTemplate(amq.getConfiguration().getConnectionFactory()); > > final TextMessage msg = (TextMessage) > jms.receive("nameRequestor"); > assertEquals("What's your name", msg.getText()); > > // there should be a JMSReplyTo so we know where to send the > reply > final Destination replyTo = msg.getJMSReplyTo(); > > // send reply > // Thread.sleep(10000); > jms.send(replyTo, new MessageCreator() { > public Message createMessage(Session session) throws > JMSException { > TextMessage replyMsg = session.createTextMessage(); > replyMsg.setText("My name is Arnio"); > > replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID()); > return replyMsg; > } > }); > return null; > } > }; > public void testCustomJMSReplyToInOut() throws Exception { > > MockEndpoint mock = getMockEndpoint("mock:result"); > mock.expectedBodiesReceived("My name is Arnio"); > mock.expectedMessageCount(10); > // do not use Camel to send and receive to simulate a non Camel > client > // use another thread to listen and send the reply > ExecutorService newFixedThreadPool = > Executors.newFixedThreadPool(20); > > for (int i = 0 ; i < 10 ; i++) { > newFixedThreadPool.submit(new Replier()); > } > > // now get started and send the first message that gets the ball > rolling > JmsTemplate jms = new > JmsTemplate(amq.getConfiguration().getConnectionFactory()); > for (int i = 0 ; i < 10 ; i++) { > jms.send("hello", new MessageCreator() { > public Message createMessage(Session session) throws > JMSException { > TextMessage msg = session.createTextMessage(); > msg.setText("Hello, I'm here"); > return msg; > } > > }); > > log.info("Hello sent"); > Thread.sleep(10); > } > > Thread.sleep(5000); > assertMockEndpointsSatisfied(); > } > > protected RouteBuilder createRouteBuilder() throws Exception { > return new RouteBuilder() { > > public void configure() throws Exception { > from("activemq:queue:hello") > .threads() > .process(new Processor() { > public void process(Exchange exchange) throws > Exception { > > exchange.getOut().setBody("What's your name"); > } > }) > // use in out to get a reply as well > > .to(ExchangePattern.InOut, > "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the > replyTo and eveything works well > .to("direct:replyprocessor"); > > from("direct:replyprocessor").process(new Processor() { > public void process(Exchange exchange) throws > Exception { > System.out.println("Here I am processing the > reply " + exchange.getIn().getBody()); > } > }).to("mock:result"); > } > }; > } > > protected CamelContext createCamelContext() throws Exception { > CamelContext camelContext = super.createCamelContext(); > amq = activeMQComponent(MQURI); > ActiveMQConnectionFactory targetFactory = new > ActiveMQConnectionFactory(MQURI); > log.info("Using MQ CachingConnectionFactory"); > CachingConnectionFactory cachedAMQConnectionFactory = new > CachingConnectionFactory(targetFactory); > > camelContext.addComponent("activemq", > ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory)); > > return camelContext; > } > > > } > > =========== > -- > View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html > Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: explicitly setting replyTo doesn't scale with .threads() and JMS cache.Hi
Also use option concurrentConsumers on the JMS endpoint to support concurrency, instead of threads. On Wed, Oct 21, 2009 at 2:58 PM, Claus Ibsen <claus.ibsen@...> wrote: > Hi > > Maybe this ticket can give some hints > https://issues.apache.org/activemq/browse/CAMEL-490 > > On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <ericbouer@...> wrote: >> >> Hello. >> In order to optimize my InOut throughput I added explicit replyTo on my JMS >> endpoint. >> This doesn't seems to be well behaving with camel threads and spring JMS >> caching. >> The following test case shows the problem. >> remove the explicit replyTo and everything works fine. >> your comments are welcome. >> >> ================ >> >> import java.util.concurrent.ExecutorService; >> import java.util.concurrent.Executors; >> import java.util.concurrent.Callable; >> import javax.jms.TextMessage; >> import javax.jms.Destination; >> import javax.jms.Message; >> import javax.jms.Session; >> import javax.jms.JMSException; >> >> import org.apache.activemq.ActiveMQConnectionFactory; >> import org.apache.camel.Exchange; >> import org.apache.camel.Processor; >> import org.apache.camel.CamelContext; >> import org.apache.camel.component.mock.MockEndpoint; >> import org.apache.camel.builder.RouteBuilder; >> import org.apache.activemq.camel.component.ActiveMQComponent; >> import org.apache.camel.ExchangePattern; >> import org.apache.camel.test.CamelTestSupport; >> import org.springframework.jms.connection.CachingConnectionFactory; >> import org.springframework.jms.core.JmsTemplate; >> import org.springframework.jms.core.MessageCreator; >> import static >> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; >> >> /** >> * Unit test using a fixed replyTo specified on the JMS endpoint >> * >> * @version $Revision: 791824 $ >> */ >> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport { >> >> private ActiveMQComponent amq; >> private static String MQURI = "failover:(tcp://localhost:61616)"; >> // private static String MQURI = >> "vm://localhost?broker.persistent=false&broker.useJmx=false"; >> >> public class Replier implements Callable { >> >> @Override >> public Object call() throws Exception { >> log.info("replier started"); >> JmsTemplate jms = new >> JmsTemplate(amq.getConfiguration().getConnectionFactory()); >> >> final TextMessage msg = (TextMessage) >> jms.receive("nameRequestor"); >> assertEquals("What's your name", msg.getText()); >> >> // there should be a JMSReplyTo so we know where to send the >> reply >> final Destination replyTo = msg.getJMSReplyTo(); >> >> // send reply >> // Thread.sleep(10000); >> jms.send(replyTo, new MessageCreator() { >> public Message createMessage(Session session) throws >> JMSException { >> TextMessage replyMsg = session.createTextMessage(); >> replyMsg.setText("My name is Arnio"); >> >> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID()); >> return replyMsg; >> } >> }); >> return null; >> } >> }; >> public void testCustomJMSReplyToInOut() throws Exception { >> >> MockEndpoint mock = getMockEndpoint("mock:result"); >> mock.expectedBodiesReceived("My name is Arnio"); >> mock.expectedMessageCount(10); >> // do not use Camel to send and receive to simulate a non Camel >> client >> // use another thread to listen and send the reply >> ExecutorService newFixedThreadPool = >> Executors.newFixedThreadPool(20); >> >> for (int i = 0 ; i < 10 ; i++) { >> newFixedThreadPool.submit(new Replier()); >> } >> >> // now get started and send the first message that gets the ball >> rolling >> JmsTemplate jms = new >> JmsTemplate(amq.getConfiguration().getConnectionFactory()); >> for (int i = 0 ; i < 10 ; i++) { >> jms.send("hello", new MessageCreator() { >> public Message createMessage(Session session) throws >> JMSException { >> TextMessage msg = session.createTextMessage(); >> msg.setText("Hello, I'm here"); >> return msg; >> } >> >> }); >> >> log.info("Hello sent"); >> Thread.sleep(10); >> } >> >> Thread.sleep(5000); >> assertMockEndpointsSatisfied(); >> } >> >> protected RouteBuilder createRouteBuilder() throws Exception { >> return new RouteBuilder() { >> >> public void configure() throws Exception { >> from("activemq:queue:hello") >> .threads() >> .process(new Processor() { >> public void process(Exchange exchange) throws >> Exception { >> >> exchange.getOut().setBody("What's your name"); >> } >> }) >> // use in out to get a reply as well >> >> .to(ExchangePattern.InOut, >> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the >> replyTo and eveything works well >> .to("direct:replyprocessor"); >> >> from("direct:replyprocessor").process(new Processor() { >> public void process(Exchange exchange) throws >> Exception { >> System.out.println("Here I am processing the >> reply " + exchange.getIn().getBody()); >> } >> }).to("mock:result"); >> } >> }; >> } >> >> protected CamelContext createCamelContext() throws Exception { >> CamelContext camelContext = super.createCamelContext(); >> amq = activeMQComponent(MQURI); >> ActiveMQConnectionFactory targetFactory = new >> ActiveMQConnectionFactory(MQURI); >> log.info("Using MQ CachingConnectionFactory"); >> CachingConnectionFactory cachedAMQConnectionFactory = new >> CachingConnectionFactory(targetFactory); >> >> camelContext.addComponent("activemq", >> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory)); >> >> return camelContext; >> } >> >> >> } >> >> =========== >> -- >> View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html >> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. >> >> > > > > -- > Claus Ibsen > Apache Camel Committer > > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
| Free embeddable forum powered by Nabble | Forum Help |