|
View:
New views
12 Messages
—
Rating Filter:
Alert me
|
|
|
request reply with sql queryHi,
This is my first attempt at coding a request reply pattern with Camel (please be gentle ). This is what I want to do: I have a Master and a Slave class. The Master class sends an sql query to the Slave class. The latter executes the query, fills the result in a List and then returns that List to the Master. These classes will be in different hosts in the future. This is the code of what I have so far:
(does anyone know how to make the <code> tags work?) The Master class: <code> CamelContext context = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); context.addComponent("activemq", ActiveMQComponent.jmsComponentAutoAcknowledge(connectionFactory)); ProducerTemplate template = context.createProducerTemplate(); context.start(); Object response = template.requestBody("activemq:queue:test.queue", "select * from clients"); List result = (List) response; // do something with the List Thread.sleep(1000); context.stop(); </code> The Slave class: <code> CamelContext context = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); context.addComponent("activemq", ActiveMQComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new RouteBuilder() { public void configure() { from("activemq:queue:test.queue").process(new Processor() { public void process(Exchange e) { try { DefaultMessage msg = (DefaultMessage) e.getIn(); String clientsQuery = (String) msg.getBody(); List result = Db.getDbResultList(clientsQuery); e.getOut(true).setBody(result); } catch(Exception ex) { logger.debug("slave process exception", ex); } } }); } }); context.start(); </code> And that's all I've got. I realize that the query is received, executed successfully, the List is filled and returned. But I don't know if this is the correct way to achieve it. Do you have any suggestion? Thanks in advance. |
|
|
Re: request reply with sql queryOn Mon, Oct 19, 2009 at 3:55 PM, linuca <linuca@...> wrote:
> > Hi, > > This is my first attempt at coding a request reply pattern with Camel > (please be gentle :blush:). This is what I want to do: I have a Master and a > Slave class. The Master class sends an sql query to the Slave class. The > latter executes the query, fills the result in a List and then returns that > List to the Master. These classes will be in different hosts in the future. > This is the code of what I have so far: > > (does anyone know how to make the <code> tags work?) > > The Master class: > > <code> > > CamelContext context = new DefaultCamelContext(); > ConnectionFactory connectionFactory = new > ActiveMQConnectionFactory("tcp://localhost:61616"); > context.addComponent("activemq", > ActiveMQComponent.jmsComponentAutoAcknowledge(connectionFactory)); > ProducerTemplate template = context.createProducerTemplate(); > context.start(); > template.requestBody("activemq:queue:test.queue", "select * from clients"); > Thread.sleep(1000); > context.stop(); > > </code> > > The Slave class: > > <code> > > CamelContext context = new DefaultCamelContext(); > ConnectionFactory connectionFactory = new > ActiveMQConnectionFactory("tcp://localhost:61616"); > context.addComponent("activemq", > ActiveMQComponent.jmsComponentAutoAcknowledge(connectionFactory)); > context.addRoutes(new RouteBuilder() { > public void configure() { > from("activemq:queue:test.queue").process(new > Processor() { > public void process(Exchange e) { > try { > DefaultMessage msg = (DefaultMessage) > e.getIn(); > String clientsQuery = (String) > msg.getBody(); > List result = > Db.getDbResultList(clientsQuery); > e.getOut(true).setBody(result); > } catch(Exception ex) { > logger.debug("slave process exception", ex); > } > } > }); > } > }); > > context.start(); > > </code> > > And that's all I've got. > I realize that the query is received and executed successfully. But I don't > know if the List is returned and I don't know how to read it from the > master. > > Thanks in advance. > -- > View this message in context: http://www.nabble.com/request-reply-with-sql-query-tp25958657p25958657.html > Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. > > Hi Welcome on the Camel ride. You actually does it right. What you just need is to grab the result where you send from the Slave to the Master. So this > template.requestBody("activemq:queue:test.queue", "select * from clients"); Should be Object reply = template.requestBody("activemq:queue:test.queue", "select * from clients"); Then you can check that the reply is that List. And you can take advantage of Camels type system to avoid the ugly type casts and verbose coding > DefaultMessage msg = (DefaultMessage) > e.getIn(); > String clientsQuery = (String) Could become String clientQuery = exchange.getIn().getBody(String.class); -- Claus Ibsen Apache Camel Committer Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: request reply with sql queryThanks Claus,
It works like charm. For now, I just send one message. I find weird that every second I get this message in the log: DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsume r { value=ID:mendoza04-43584-1255964003207-0:0:1:1, started=true }] of session [ActiveMQSession {id=ID:mendoza04-43584-1255964003207-0:0:1,started=true}] did not receive a message Why is that? |
|
|
Re: request reply with sql queryOn Mon, Oct 19, 2009 at 4:56 PM, linuca <linuca@...> wrote:
> > Thanks Claus, > > It works like charm. > > For now, I just send one message. > > I find weird that every second I get this message in the log: > > DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer - > Consumer [ActiveMQMessageConsume > r { value=ID:mendoza04-43584-1255964003207-0:0:1:1, started=true }] of > session [ActiveMQSession > {id=ID:mendoza04-43584-1255964003207-0:0:1,started=true}] > did not receive a message > > Why is that? Its actually Spring JMS that does the JMS sending / receiving. Camel is just on the top :) You may be able to get rid of it by setting the maxMessagesPerTas=-1 as documented here: http://camel.apache.org/jms.html > -- > View this message in context: http://www.nabble.com/request-reply-with-sql-query-tp25958657p25959791.html > Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: request reply with sql queryBut I want ActiveMQ to do the actual JMS sending / receiving.
Anyway, I changed the code to: Object response = template.requestBody("activemq:queue:test.queue?maxMessagesPerTask=-1", "select * from clients"); And it does the same "did not receive a message " logging. Any ideas?
|
|
|
Re: request reply with sql queryThis is DEBUG level. If you set DEBUG you somehow agree to see a lot of junk
;) Just raise log4j log level for this logger (or simply for org.springframework). BTW this is perfectly valid behavior - Spring-JMS just logs those things on DEBUG level. Roman 2009/10/19 linuca <linuca@...> > > But I want ActiveMQ to do the actual JMS sending / receiving. > > Anyway, I changed the code to: > > Object response = > template.requestBody("activemq:queue:test.queue?maxMessagesPerTask=-1", > "select * from clients"); > > And it does the same "did not receive a message " logging. > > Any ideas? > > > Claus Ibsen-2 wrote: > > > > On Mon, Oct 19, 2009 at 4:56 PM, linuca <linuca@...> wrote: > >> > >> Thanks Claus, > >> > >> It works like charm. > >> > >> For now, I just send one message. > >> > >> I find weird that every second I get this message in the log: > >> > >> DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer - > >> Consumer [ActiveMQMessageConsume > >> r { value=ID:mendoza04-43584-1255964003207-0:0:1:1, started=true }] of > >> session [ActiveMQSession > >> {id=ID:mendoza04-43584-1255964003207-0:0:1,started=true}] > >> did not receive a message > >> > >> Why is that? > > > > Its actually Spring JMS that does the JMS sending / receiving. Camel > > is just on the top :) > > > > You may be able to get rid of it by setting the maxMessagesPerTas=-1 > > as documented here: > > http://camel.apache.org/jms.html > > > > > > > >> -- > >> View this message in context: > >> > http://www.nabble.com/request-reply-with-sql-query-tp25958657p25959791.html > >> Sent from the Camel - Users (activemq) mailing list archive at > >> Nabble.com. > >> > >> > > > > > > > > -- > > Claus Ibsen > > Apache Camel Committer > > > > Open Source Integration: http://fusesource.com > > Blog: http://davsclaus.blogspot.com/ > > Twitter: http://twitter.com/davsclaus > > > > > > -- > View this message in context: > http://www.nabble.com/request-reply-with-sql-query-tp25958657p25960255.html > Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. > > |
|
|
Re: request reply with sql queryOn Mon, Oct 19, 2009 at 5:24 PM, linuca <linuca@...> wrote:
> > But I want ActiveMQ to do the actual JMS sending / receiving. It is also AMQ that does it. Camel + Spring is just a layer on top. In the bottom it is AMQ doing all the low level transportation. > > Anyway, I changed the code to: > > Object response = > template.requestBody("activemq:queue:test.queue?maxMessagesPerTask=-1", > "select * from clients"); > > And it does the same "did not receive a message " logging. > > Any ideas? > > > Claus Ibsen-2 wrote: >> >> On Mon, Oct 19, 2009 at 4:56 PM, linuca <linuca@...> wrote: >>> >>> Thanks Claus, >>> >>> It works like charm. >>> >>> For now, I just send one message. >>> >>> I find weird that every second I get this message in the log: >>> >>> DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer - >>> Consumer [ActiveMQMessageConsume >>> r { value=ID:mendoza04-43584-1255964003207-0:0:1:1, started=true }] of >>> session [ActiveMQSession >>> {id=ID:mendoza04-43584-1255964003207-0:0:1,started=true}] >>> did not receive a message >>> >>> Why is that? >> >> Its actually Spring JMS that does the JMS sending / receiving. Camel >> is just on the top :) >> >> You may be able to get rid of it by setting the maxMessagesPerTas=-1 >> as documented here: >> http://camel.apache.org/jms.html >> >> >> >>> -- >>> View this message in context: >>> http://www.nabble.com/request-reply-with-sql-query-tp25958657p25959791.html >>> Sent from the Camel - Users (activemq) mailing list archive at >>> Nabble.com. >>> >>> >> >> >> >> -- >> Claus Ibsen >> Apache Camel Committer >> >> Open Source Integration: http://fusesource.com >> Blog: http://davsclaus.blogspot.com/ >> Twitter: http://twitter.com/davsclaus >> >> > > -- > View this message in context: http://www.nabble.com/request-reply-with-sql-query-tp25958657p25960255.html > Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: request reply with sql queryHi,
I run into another problem. If I run the standalone app of activemq in another host in my network, it just does no receive the messages. 1. I changed "localhost" to the real ip addres. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.1:61616"); 2. I made sure that activemq was running. 3. The tcp port is not blocked by a firewall. The error I see in the tomcat log is the following: org.apache.camel.RuntimeCamelException: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis on the exchange: Exchange[JmsMessage: select * from clients] at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:724) at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:334) at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:98) at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:113) at org.apache.camel.impl.DefaultProducerTemplate.requestBody(DefaultProducerTemplate.java:176) It works if it is in the same localhost. Any ideas? |
|
|
Re: request reply with sql queryOn Tue, Oct 20, 2009 at 3:13 PM, linuca <linuca@...> wrote:
> > Hi, > > I run into another problem. If I run the standalone app of activemq in > another host in my network, it just does no receive the messages. > > 1. I changed "localhost" to the real ip addres. > > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory("tcp://192.168.0.1:61616"); > > 2. I made sure that activemq was running. > 3. The tcp port is not blocked by a firewall. > > The error I see in the tomcat log is the following: > > org.apache.camel.RuntimeCamelException: > org.apache.camel.ExchangeTimedOutException: The OUT message was not received > within: 20000 millis on the exchange: Exchange[JmsMessage: select > * from clients] > at > org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:724) > at > org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:334) > at > org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:98) > at > org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:113) > at > org.apache.camel.impl.DefaultProducerTemplate.requestBody(DefaultProducerTemplate.java:176) > > It works if it is in the same localhost. > Any ideas? > -- Look at the logs on the server. It doesnt send a response back (on that temporary jms queue). Could be some exception happening on the server. > View this message in context: http://www.nabble.com/request-reply-with-sql-query-tp25958657p25974866.html > Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: request reply with sql queryHi, thanks for the hint.
This is what I get in the activemq log. Is this because the queue expired before it got the message? <code> Sending: WireFormatInfo { version=3, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} DEBUG WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=3, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} DEBUG WireFormatNegotiator - tcp:///192.168.0.4:33508 before negotiation: OpenWireFormat{version=3, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false} DEBUG WireFormatNegotiator - tcp:///192.168.0.4:33508 after negotiation: OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false} DEBUG TransportConnection - Setting up new connection: /192.168.0.4:33508 DEBUG AbstractRegion - Adding consumer: ID:mendoza04-53640-1256046824338-0:0:-1:1 DEBUG AbstractRegion - Adding consumer: ID:mendoza04-53640-1256046824338-0:0:1:1 DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.test.queue DEBUG WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=3, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} DEBUG WireFormatNegotiator - Sending: WireFormatInfo { version=3, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} DEBUG WireFormatNegotiator - tcp:///192.168.0.4:33509 before negotiation: OpenWireFormat{version=3, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false} DEBUG WireFormatNegotiator - tcp:///192.168.0.4:33509 after negotiation: OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false} DEBUG TransportConnection - Setting up new connection: /192.168.0.4:33509 DEBUG AbstractRegion - Adding consumer: ID:mendoza04-53640-1256046824338-0:1:-1:1 DEBUG AbstractRegion - Adding destination: temp-queue://ID:mendoza04-53640-1256046824338-0:1:1 DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.TempQueue DEBUG AbstractRegion - Adding consumer: ID:mendoza04-53640-1256046824338-0:1:1:1 DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.ID:mendoza04-53640-1256046824338-0:1:1 DEBUG WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=3, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} DEBUG WireFormatNegotiator - Sending: WireFormatInfo { version=3, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} DEBUG WireFormatNegotiator - tcp:///192.168.0.4:33510 before negotiation: OpenWireFormat{version=3, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false} DEBUG WireFormatNegotiator - tcp:///192.168.0.4:33510 after negotiation: OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false} DEBUG TransportConnection - Setting up new connection: /192.168.0.4:33510 DEBUG AbstractRegion - Adding consumer: ID:mendoza04-53640-1256046824338-0:2:-1:1 DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Producer.Queue.test.queue DEBUG RegionBroker - Message expired ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:mendoza04-53640-1256046824338-0:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:mendoza04-53640-1256046824338-0:2:1:1, destination = queue://test.queue, transactionId = null, expiration = 1256046844866, timestamp = 1256046824866, arrival = 0, brokerInTime = 1256046883092, brokerOutTime = 0, correlationId = ID-mendoza04/44477-1256046824630/2-0, replyTo = temp-queue://ID:mendoza04-53640-1256046824338-0:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = select * from clients} DEBUG AMQMessageStore - Journalled message add for: ID:mendoza04-53640-1256046824338-0:2:1:1:1, at: offset = 3751, file = 1, size = 452, type = 1 DEBUG AMQMessageStore - Doing batch update... adding: 1 removing: 0 DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Expired.Queue.test.queue DEBUG AMQMessageStore - Batch update done. DEBUG AbstractRegion - Removing consumer: ID:mendoza04-53640-1256046824338-0:2:-1:1 DEBUG TransportConnection - Stopping connection: /192.168.0.4:33510 DEBUG TcpTransport - Stopping transport tcp:///192.168.0.4:33510 DEBUG TransportConnection - Stopped transport: /192.168.0.4:33510 DEBUG TransportConnection - Connection Stopped: /192.168.0.4:33510 DEBUG AMQPersistenceAdapter - Checkpoint started. DEBUG AMQPersistenceAdapter - Marking journal at: offset = 3751, file = 1, size = 452, type = 1 DEBUG AMQPersistenceAdapter - Checkpoint done. DEBUG AMQPersistenceAdapter - dataFilesInProgress.values: (0) [] DEBUG AMQPersistenceAdapter - lastDataFile: 1 DEBUG AsyncDataManager - lastFileId=0, purgeList: (0) [] DEBUG InactivityMonitor - 9999 ms elapsed since last write check. DEBUG InactivityMonitor - 10000 ms elapsed since last write check. </code> Cheers. |
|
|
Re: request reply with sql queryHi,
I tested the resquest/replay example from ActiveMQ (without using Camel) and it worked fine, so I don't think there is something wrong with the connection. So I assume there is something wrong with my Camel code? Can anyone tell me what I'm doing wrong? Thanks. |
|
|
Re: request reply with sql queryOn Wed, Oct 21, 2009 at 2:31 PM, linuca <linuca@...> wrote:
> > Hi, > > I tested the resquest/replay example from ActiveMQ (without using Camel) and > it worked fine, so I don't think there is something wrong with the > connection. > > So I assume there is something wrong with my Camel code? Can anyone tell me > what I'm doing wrong? > You can try the Camel JMS tutorial which does routing over JMS between a client and a server http://camel.apache.org/tutorial-jmsremoting.html > Thanks. > -- > View this message in context: http://www.nabble.com/request-reply-with-sql-query-tp25958657p25991585.html > Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
| Free embeddable forum powered by Nabble | Forum Help |