ack query on JDBC inbound query not part of transaction?

View: New views
3 Messages — Rating Filter:   Alert me  

ack query on JDBC inbound query not part of transaction?

by Gerwin Postma :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,

Scenario: I want to configure the same jdbc inbound endpoint on two mule instances for failover (active-active setup), but avoid that the same message is picked up twice. A nice solution would be if I could use a 'SELECT FOR UPDATE' on my inbound select query as in the following condif  

        <jdbc:connector name="jdbcConnector" pollingFrequency="3000" dataSource-ref="oracleXADataSource">
                <jdbc:query key="selectWithStatusPending"
                        value="select * from CUSTOM_QUEUE_TABLE where STATUS='Pending' AND ROWNUM <= 2 for update of STATUS" />
                <jdbc:query key="selectWithStatusPending.ack"
                        value="update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = #[map-payload:MESSAGE_ID]" />
        </jdbc:connector>

        <jbossts:transaction-manager/>
        <model name="MyJdbcModel">
                <service name="QueueSubscriberAndEcho">
                        <inbound>
                                <jdbc:inbound-endpoint queryKey="selectWithStatusPending" transformer-refs="PayloadFromMap">
                                     <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/>
                                 </jdbc:inbound-endpoint>
                        </inbound>
                        <component class="my.org.EchoSleeper"/>
                        <outbound>
                                <pass-through-router>
                                        <jms:outbound-endpoint name="topic_queue_out"
                                                connector-ref="OracleAQConnector" topic="topic_queue">
                                                <xa-transaction action="ALWAYS_JOIN"/>
                                        </jms:outbound-endpoint>
                                </pass-through-router>
                        </outbound>
                </service>
        </model>

When I try to run this config I get the following error:

ERROR 2009-08-12 07:24:54,176 [jdbcConnector.receiver.2] org.mule.DefaultExceptionStrategy: Caught exception in Exception Strategy: ORA-02049: timeout: distributed transaction waiting for lock
 Query: update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = ? Parameters: [1]
java.sql.SQLException: ORA-02049: timeout: distributed transaction waiting for lock
 Query: update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = ? Parameters: [1]
        at org.apache.commons.dbutils.QueryRunner.rethrow(QueryRunner.java:359)
        at org.apache.commons.dbutils.QueryRunner.update(QueryRunner.java:428)
        at org.mule.transport.jdbc.JdbcMessageReceiver.processMessage(JdbcMessageReceiver.java:150)
        at org.mule.transport.TransactedPollingMessageReceiver$MessageProcessorWorker.doInTransaction(TransactedPollingMessageReceiver.java:194)
        at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:99)
        at org.mule.transport.TransactedPollingMessageReceiver$MessageProcessorWorker.run(TransactedPollingMessageReceiver.java:180)
        at org.mule.work.WorkerContext.run(WorkerContext.java:310)
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
        at java.lang.Thread.run(Unknown Source)

Apparently the ack query is waiting for a lock set by the select query. My hope was that it would use the same transaction (and connection?) so that it would be allowed to update the rows. Any suggestions on how to resolve this?

Btw. If I change the query to a normal select (without FOR UPDATE) everythings works as expected (but messages are processed twice).

Regards,

Richard

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email



Re: ack query on JDBC inbound query not part of transaction?

by antoine.borg :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Hi,

Can you do this with a stored proc, perhaps?

A

Richard Swart wrote:

> Hi,
>
> Scenario: I want to configure the same jdbc inbound endpoint on two mule instances for failover (active-active setup), but avoid that the same message is picked up twice. A nice solution would be if I could use a 'SELECT FOR UPDATE' on my inbound select query as in the following condif  
>
> <jdbc:connector name="jdbcConnector" pollingFrequency="3000" dataSource-ref="oracleXADataSource">
> <jdbc:query key="selectWithStatusPending"
> value="select * from CUSTOM_QUEUE_TABLE where STATUS='Pending' AND ROWNUM <= 2 for update of STATUS" />
> <jdbc:query key="selectWithStatusPending.ack"
> value="update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = #[map-payload:MESSAGE_ID]" />
> </jdbc:connector>
>
>         <jbossts:transaction-manager/>
> <model name="MyJdbcModel">
> <service name="QueueSubscriberAndEcho">
> <inbound>
> <jdbc:inbound-endpoint queryKey="selectWithStatusPending" transformer-refs="PayloadFromMap">
>     <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/>
>                                  </jdbc:inbound-endpoint>
> </inbound>
> <component class="my.org.EchoSleeper"/>
> <outbound>
> <pass-through-router>
> <jms:outbound-endpoint name="topic_queue_out"
> connector-ref="OracleAQConnector" topic="topic_queue">
> <xa-transaction action="ALWAYS_JOIN"/>
> </jms:outbound-endpoint>
> </pass-through-router>
> </outbound>
> </service>
> </model>
>
> When I try to run this config I get the following error:
>
> ERROR 2009-08-12 07:24:54,176 [jdbcConnector.receiver.2] org.mule.DefaultExceptionStrategy: Caught exception in Exception Strategy: ORA-02049: timeout: distributed transaction waiting for lock
>  Query: update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = ? Parameters: [1]
> java.sql.SQLException: ORA-02049: timeout: distributed transaction waiting for lock
>  Query: update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = ? Parameters: [1]
>         at org.apache.commons.dbutils.QueryRunner.rethrow(QueryRunner.java:359)
>         at org.apache.commons.dbutils.QueryRunner.update(QueryRunner.java:428)
>         at org.mule.transport.jdbc.JdbcMessageReceiver.processMessage(JdbcMessageReceiver.java:150)
>         at org.mule.transport.TransactedPollingMessageReceiver$MessageProcessorWorker.doInTransaction(TransactedPollingMessageReceiver.java:194)
>         at org.mule.transaction.TransactionTemplate.execute(TransactionTemplate.java:99)
>         at org.mule.transport.TransactedPollingMessageReceiver$MessageProcessorWorker.run(TransactedPollingMessageReceiver.java:180)
>         at org.mule.work.WorkerContext.run(WorkerContext.java:310)
>         at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
>         at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
>         at java.lang.Thread.run(Unknown Source)
>
> Apparently the ack query is waiting for a lock set by the select query. My hope was that it would use the same transaction (and connection?) so that it would be allowed to update the rows. Any suggestions on how to resolve this?
>
> Btw. If I change the query to a normal select (without FOR UPDATE) everythings works as expected (but messages are processed twice).
>
> Regards,
>
> Richard
>
> ---------------------------------------------------------------------
> To unsubscribe from this list, please visit:
>
>     http://xircles.codehaus.org/manage_email
>
>
>
>  

--

Antoine Borg, Director of Technical Services | Tel: +32 28 504 696
ricston Ltd., BP 2, 1180 Uccle, Brussels, BELGIUM

See our full schedule of Mule and Android courses:
http://www.ricston.com/courses/schedules/

email: _antoine.borg_@... <mailto:antoine.borg@...> |
blog: blog.ricston.com <http://blog.ricston.com> | web: ricston.com
<http://www.ricston.com/>



---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email



Re: ack query on JDBC inbound query not part of transaction?

by Gerwin Postma :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Thanks to Puneets blog http://blogs.mulesoft.org/selecting-multiple-rows-using-a-jdbc-inbound-endpoint/ I learned about the properties you can set on an inbound jdbc endpoint:

             <property key="receiveMessageInTransaction" value="false"/>
             <property key="receiveMessagesInXaTransaction" value="true"/>


Putting it all together I could make things work in an active-active setup per config below. Downside is that I can only select one message per poll which is not optimal from a performance perspective.

        <jdbc:connector name="jdbcConnector" pollingFrequency="3000" dataSource-ref="oracleXADataSource" transactionPerMessage="false">
                <jdbc:query key="selectWithStatusPending"
                        value="select * from CUSTOM_QUEUE_TABLE where STATUS='Pending' AND ROWNUM <= 1 for update of STATUS" />
                <jdbc:query key="selectWithStatusPending.ack"
                        value="update CUSTOM_QUEUE_TABLE set STATUS='Processing', date_updated=sysdate where MESSAGE_ID = #[map-payload:MESSAGE_ID]" />
        </jdbc:connector>

        <jbossts:transaction-manager/>
        <model name="MyJdbcModel">
                <service name="QueueSubscriberAndEcho">
                        <inbound>
                                <jdbc:inbound-endpoint queryKey="selectWithStatusPending" transformer-refs="PayloadFromMap">
                                     <xa-transaction action="ALWAYS_BEGIN" timeout="60000"/>
                                     <property key="receiveMessageInTransaction" value="false"/>
                                     <property key="receiveMessagesInXaTransaction" value="true"/>
                                 </jdbc:inbound-endpoint>
                        </inbound>
                        <component class="org.rotterdam.gw.mule.component.EchoSleeper"/>
                        <outbound>
                                <pass-through-router>
                                        <jms:outbound-endpoint name="topic_queue_out"
                                                connector-ref="OracleAQConnector" topic="topic_queue">
                                                <xa-transaction action="ALWAYS_JOIN"/>
                                        </jms:outbound-endpoint>
                                </pass-through-router>
                        </outbound>
                </service>

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email