|
View:
New views
3 Messages
—
Rating Filter:
Alert me
|
|
|
ack query on JDBC inbound query not part of transaction?Hi,
Scenario: I want to configure the same jdbc inbound endpoint on two mule instances for failover (active-active setup), but avoid that the same message is picked up twice. A nice solution would be if I could use a 'SELECT FOR UPDATE' on my inbound select query as in the following condif <jdbc:connector name="jdbcConnector" pollingFrequency="3000" dataSource-ref="oracleXADataSource"> <jdbc:query key="selectWithStatusPending" value="select * from CUSTOM_QUEUE_TABLE where STATUS='Pending' AND ROWNUM <= 2 for update of STATUS" /> <jdbc:query key="selectWithStatusPending.ack" value="update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = #[map-payload:MESSAGE_ID]" /> </jdbc:connector> <jbossts:transaction-manager/> <model name="MyJdbcModel"> <service name="QueueSubscriberAndEcho"> <inbound> <jdbc:inbound-endpoint queryKey="selectWithStatusPending" transformer-refs="PayloadFromMap"> <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/> </jdbc:inbound-endpoint> </inbound> <component class="my.org.EchoSleeper"/> <outbound> <pass-through-router> <jms:outbound-endpoint name="topic_queue_out" connector-ref="OracleAQConnector" topic="topic_queue"> <xa-transaction action="ALWAYS_JOIN"/> </jms:outbound-endpoint> </pass-through-router> </outbound> </service> </model> When I try to run this config I get the following error: ERROR 2009-08-12 07:24:54,176 [jdbcConnector.receiver.2] org.mule.DefaultExceptionStrategy: Caught exception in Exception Strategy: ORA-02049: timeout: distributed transaction waiting for lock Query: update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = ? Parameters: [1] java.sql.SQLException: ORA-02049: timeout: distributed transaction waiting for lock Query: update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = ? Parameters: [1] at org.apache.commons.dbutils.QueryRunner.rethrow(QueryRunner.java:359) at org.apache.commons.dbutils.QueryRunner.update(QueryRunner.java:428) at org.mule.transport.jdbc.JdbcMessageReceiver.processMessage(JdbcMessageReceiver.java:150) at org.mule.transport.TransactedPollingMessageReceiver$MessageProcessorWorker.doInTransaction(TransactedPollingMessageReceiver.java:194) at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:99) at org.mule.transport.TransactedPollingMessageReceiver$MessageProcessorWorker.run(TransactedPollingMessageReceiver.java:180) at org.mule.work.WorkerContext.run(WorkerContext.java:310) at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061) at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575) at java.lang.Thread.run(Unknown Source) Apparently the ack query is waiting for a lock set by the select query. My hope was that it would use the same transaction (and connection?) so that it would be allowed to update the rows. Any suggestions on how to resolve this? Btw. If I change the query to a normal select (without FOR UPDATE) everythings works as expected (but messages are processed twice). Regards, Richard --------------------------------------------------------------------- To unsubscribe from this list, please visit: http://xircles.codehaus.org/manage_email |
|
|
Re: ack query on JDBC inbound query not part of transaction?Hi,
Can you do this with a stored proc, perhaps? A Richard Swart wrote: > Hi, > > Scenario: I want to configure the same jdbc inbound endpoint on two mule instances for failover (active-active setup), but avoid that the same message is picked up twice. A nice solution would be if I could use a 'SELECT FOR UPDATE' on my inbound select query as in the following condif > > <jdbc:connector name="jdbcConnector" pollingFrequency="3000" dataSource-ref="oracleXADataSource"> > <jdbc:query key="selectWithStatusPending" > value="select * from CUSTOM_QUEUE_TABLE where STATUS='Pending' AND ROWNUM <= 2 for update of STATUS" /> > <jdbc:query key="selectWithStatusPending.ack" > value="update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = #[map-payload:MESSAGE_ID]" /> > </jdbc:connector> > > <jbossts:transaction-manager/> > <model name="MyJdbcModel"> > <service name="QueueSubscriberAndEcho"> > <inbound> > <jdbc:inbound-endpoint queryKey="selectWithStatusPending" transformer-refs="PayloadFromMap"> > <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/> > </jdbc:inbound-endpoint> > </inbound> > <component class="my.org.EchoSleeper"/> > <outbound> > <pass-through-router> > <jms:outbound-endpoint name="topic_queue_out" > connector-ref="OracleAQConnector" topic="topic_queue"> > <xa-transaction action="ALWAYS_JOIN"/> > </jms:outbound-endpoint> > </pass-through-router> > </outbound> > </service> > </model> > > When I try to run this config I get the following error: > > ERROR 2009-08-12 07:24:54,176 [jdbcConnector.receiver.2] org.mule.DefaultExceptionStrategy: Caught exception in Exception Strategy: ORA-02049: timeout: distributed transaction waiting for lock > Query: update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = ? Parameters: [1] > java.sql.SQLException: ORA-02049: timeout: distributed transaction waiting for lock > Query: update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = ? Parameters: [1] > at org.apache.commons.dbutils.QueryRunner.rethrow(QueryRunner.java:359) > at org.apache.commons.dbutils.QueryRunner.update(QueryRunner.java:428) > at org.mule.transport.jdbc.JdbcMessageReceiver.processMessage(JdbcMessageReceiver.java:150) > at org.mule.transport.TransactedPollingMessageReceiver$MessageProcessorWorker.doInTransaction(TransactedPollingMessageReceiver.java:194) > at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:99) > at org.mule.transport.TransactedPollingMessageReceiver$MessageProcessorWorker.run(TransactedPollingMessageReceiver.java:180) > at org.mule.work.WorkerContext.run(WorkerContext.java:310) > at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061) > at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575) > at java.lang.Thread.run(Unknown Source) > > Apparently the ack query is waiting for a lock set by the select query. My hope was that it would use the same transaction (and connection?) so that it would be allowed to update the rows. Any suggestions on how to resolve this? > > Btw. If I change the query to a normal select (without FOR UPDATE) everythings works as expected (but messages are processed twice). > > Regards, > > Richard > > --------------------------------------------------------------------- > To unsubscribe from this list, please visit: > > http://xircles.codehaus.org/manage_email > > > > -- Antoine Borg, Director of Technical Services | Tel: +32 28 504 696 ricston Ltd., BP 2, 1180 Uccle, Brussels, BELGIUM See our full schedule of Mule and Android courses: http://www.ricston.com/courses/schedules/ email: _antoine.borg_@... <mailto:antoine.borg@...> | blog: blog.ricston.com <http://blog.ricston.com> | web: ricston.com <http://www.ricston.com/> --------------------------------------------------------------------- To unsubscribe from this list, please visit: http://xircles.codehaus.org/manage_email |
|
|
Re: ack query on JDBC inbound query not part of transaction?Thanks to Puneets blog http://blogs.mulesoft.org/selecting-multiple-rows-using-a-jdbc-inbound-endpoint/ I learned about the properties you can set on an inbound jdbc endpoint:
<property key="receiveMessageInTransaction" value="false"/> <property key="receiveMessagesInXaTransaction" value="true"/> Putting it all together I could make things work in an active-active setup per config below. Downside is that I can only select one message per poll which is not optimal from a performance perspective. <jdbc:connector name="jdbcConnector" pollingFrequency="3000" dataSource-ref="oracleXADataSource" transactionPerMessage="false"> <jdbc:query key="selectWithStatusPending" value="select * from CUSTOM_QUEUE_TABLE where STATUS='Pending' AND ROWNUM <= 1 for update of STATUS" /> <jdbc:query key="selectWithStatusPending.ack" value="update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = #[map-payload:MESSAGE_ID]" /> </jdbc:connector> <jbossts:transaction-manager/> <model name="MyJdbcModel"> <service name="QueueSubscriberAndEcho"> <inbound> <jdbc:inbound-endpoint queryKey="selectWithStatusPending" transformer-refs="PayloadFromMap"> <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/> <property key="receiveMessageInTransaction" value="false"/> <property key="receiveMessagesInXaTransaction" value="true"/> </jdbc:inbound-endpoint> </inbound> <component class="org.rotterdam.gw.mule.component.EchoSleeper"/> <outbound> <pass-through-router> <jms:outbound-endpoint name="topic_queue_out" connector-ref="OracleAQConnector" topic="topic_queue"> <xa-transaction action="ALWAYS_JOIN"/> </jms:outbound-endpoint> </pass-through-router> </outbound> </service> --------------------------------------------------------------------- To unsubscribe from this list, please visit: http://xircles.codehaus.org/manage_email |
| Free embeddable forum powered by Nabble | Forum Help |