|
View:
New views
5 Messages
—
Rating Filter:
Alert me
|
|
|
Discussion - Route Policy feature in Camel 2.1Hi
A bit of background is CAMEL-1048 https://issues.apache.org/activemq/browse/CAMEL-1048 I am looking into this how to implement this nicely in Camel. CAMEL-1048 is maybe a bit too much at current requirements from Camel users. What they are looking for is to be able to dynamic throttle consumers. Apache CXF and ServiceMix has such a feature specially build in their JMS components. What it does is that it can stop the JMS listener when there are too many messages in flight. But on the other hand we have also had Camel users wanting to dynamic start/stop consumers depending on some flag of some sort. So there is also grounds to make this a general solution that can cater both use cases. I do wonder how to move forward. In CAMEL-1048 there is an example using requires to associate a route that this predicate must be true for the route to be running. I do not like the naming requires or require. And having it in the fluent builder / DSL makes it fixed. Wonder if we should name such a configuration a RoutePolicy so you can configure it as <route routePolicy="myRoutePolicy"> <from .../> ... </route> <bean id="myRoutePolicy" class=...> <property name="maxInflightExchanges" value="500"/> </bean> This is quite flexible as we can just offer a SPI interface for the RoutePolicy and have people implement it as how they like it. For example as above something that is controlled by the number of current in flight exchanges. Others can control it by CPU utliization, a switch, timer based etc. In the Java DSL its from("xxx").routePolicy(myRoutePolicy).to(yyyy); -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: Discussion - Route Policy feature in Camel 2.1Hi
I have done some experiment and I am setting down for something like this Where we can configure a route policy to use. In this case we use a throtteling policy that is based on the number of in flight exchanges. In this example at most 16 in flights messages is aimed for. It can go higher. But when the policy is investigated it will react if the current in flight > max. If so it will suspend/stop the consumer. And when it goes down to 25% of 16 (=4) then it will resume / start the consumer again. Apache CXF have already this feature. So its inspired from this project. <bean id="myPolicy" class="org.apache.camel.impl.ThrottelingRoutePolicy"> <property name="maxInflightExchanges" value="16"/> <property name="reconnectPercentOfMax" value="25"/> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route routePolicyRef="myPolicy"> <from uri="activemq:queue:foo?concurrentConsumers=20"/> <transacted/> <delay><constant>100</constant></delay> <to uri="log:foo?groupSize=10"/> <to uri="mock:result"/> </route> </camelContext> You can of course implement your own route policy and do whatever you like. The route policy is this interface. I amy want to adjust the naming of that method and maybe change it to be more routish. Its based on a callback that is invoked when an Exchange is done being routed. So maybe something as: void onRouteDone(Route route, Exchange exchange); is better as its more clear to the point. And you can get the Consumer from the Route so you can grab it and susped/stop it on the fly. public interface RoutePolicy { /** * Callback invokes when a {@link Consumer} have consumed and created an {@link Exchange} * * @param consumer the consumer * @param exchange the created exchange */ void onConsume(Consumer consumer, Exchange exchange); } On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <claus.ibsen@...> wrote: > Hi > > A bit of background is CAMEL-1048 > https://issues.apache.org/activemq/browse/CAMEL-1048 > > I am looking into this how to implement this nicely in Camel. > > CAMEL-1048 is maybe a bit too much at current requirements from Camel users. > What they are looking for is to be able to dynamic throttle consumers. > > Apache CXF and ServiceMix has such a feature specially build in their > JMS components. > What it does is that it can stop the JMS listener when there are too > many messages in flight. > > But on the other hand we have also had Camel users wanting to dynamic > start/stop consumers depending on some flag of some sort. > So there is also grounds to make this a general solution that can > cater both use cases. > > I do wonder how to move forward. > In CAMEL-1048 there is an example using requires to associate a route > that this predicate must be true for the route to be running. > I do not like the naming requires or require. And having it in the > fluent builder / DSL makes it fixed. > > Wonder if we should name such a configuration a RoutePolicy so you can > configure it as > > <route routePolicy="myRoutePolicy"> > <from .../> > ... > </route> > > <bean id="myRoutePolicy" class=...> > <property name="maxInflightExchanges" value="500"/> > </bean> > > This is quite flexible as we can just offer a SPI interface for the > RoutePolicy and have people implement it as how they like it. > For example as above something that is controlled by the number of > current in flight exchanges. > > Others can control it by CPU utliization, a switch, timer based etc. > > > In the Java DSL its > > from("xxx").routePolicy(myRoutePolicy).to(yyyy); > > > > > -- > Claus Ibsen > Apache Camel Committer > > Author of Camel in Action: http://www.manning.com/ibsen/ > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: Discussion - Route Policy feature in Camel 2.1Hi
I have committed a first cut of this feature in rev 830861. Please feel free to take a look. And feedback on the locking in ThrottlingRoutePolicy class is welcome. I did consider using try locks in case there was a race, as after all we just want at least one of them to acquire the lock and invoke the suspension / resume. There may be clever ways to archive this. For example running it by throttling this route <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingRoutePolicy"> <property name="maxInflightExchanges" value="16"/> <property name="resumePercentOfMax" value="25"/> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route routePolicyRef="myPolicy"> <from uri="activemq:queue:foo?concurrentConsumers=20"/> <transacted/> <delay><constant>100</constant></delay> <to uri="log:foo?groupSize=10"/> <to uri="mock:result"/> </route> </camelContext> Will yield this output. Yes its throttling very often but this is due to the low number of concurrent consumers and the fact the exchanges are routed at a constant pace. 2009-10-29 10:02:12,788 [main ] INFO DefaultCamelContext - Apache Camel 2.1-SNAPSHOT (CamelContext:camelContext) started 2009-10-29 10:02:13,179 [enerContainer-6] INFO foo - Received: 10 messages so far. Last group took: 4 millis which is: 2,500 messages per second. average: 2,500 2009-10-29 10:02:13,184 [nerContainer-19] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:13,193 [nerContainer-13] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:13,203 [nerContainer-16] INFO foo - Received: 20 messages so far. Last group took: 25 millis which is: 400 messages per second. average: 689.655 2009-10-29 10:02:13,302 [nerContainer-10] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:13,320 [enerContainer-9] INFO foo - Received: 30 messages so far. Last group took: 117 millis which is: 85.47 messages per second. average: 205.479 2009-10-29 10:02:13,333 [nerContainer-18] INFO foo - Received: 40 messages so far. Last group took: 13 millis which is: 769.231 messages per second. average: 251.572 2009-10-29 10:02:13,344 [enerContainer-3] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:13,444 [nerContainer-10] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:13,446 [nerContainer-15] INFO foo - Received: 50 messages so far. Last group took: 113 millis which is: 88.496 messages per second. average: 183.824 2009-10-29 10:02:13,455 [enerContainer-3] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:13,457 [nerContainer-11] INFO foo - Received: 60 messages so far. Last group took: 11 millis which is: 909.091 messages per second. average: 212.014 2009-10-29 10:02:13,560 [enerContainer-2] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:13,570 [nerContainer-13] INFO foo - Received: 70 messages so far. Last group took: 113 millis which is: 88.496 messages per second. average: 176.768 2009-10-29 10:02:13,574 [nerContainer-11] INFO foo - Received: 80 messages so far. Last group took: 4 millis which is: 2,500 messages per second. average: 200 2009-10-29 10:02:13,574 [nerContainer-16] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:13,660 [main ] INFO MockEndpoint - Asserting: Endpoint[mock://result] is satisfied 2009-10-29 10:02:13,681 [nerContainer-20] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:13,693 [nerContainer-13] INFO foo - Received: 90 messages so far. Last group took: 119 millis which is: 84.034 messages per second. average: 173.41 2009-10-29 10:02:13,701 [enerContainer-5] INFO foo - Received: 100 messages so far. Last group took: 8 millis which is: 1,250 messages per second. average: 189.753 2009-10-29 10:02:13,725 [nerContainer-16] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:13,809 [enerContainer-2] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:13,826 [enerContainer-4] INFO foo - Received: 110 messages so far. Last group took: 125 millis which is: 80 messages per second. average: 168.712 2009-10-29 10:02:13,837 [enerContainer-5] INFO foo - Received: 120 messages so far. Last group took: 11 millis which is: 909.091 messages per second. average: 180.995 2009-10-29 10:02:13,851 [enerContainer-3] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:13,941 [nerContainer-15] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:13,952 [nerContainer-19] INFO foo - Received: 130 messages so far. Last group took: 115 millis which is: 86.957 messages per second. average: 167.095 2009-10-29 10:02:13,957 [enerContainer-9] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:13,963 [nerContainer-16] INFO foo - Received: 140 messages so far. Last group took: 11 millis which is: 909.091 messages per second. average: 177.44 2009-10-29 10:02:14,071 [nerContainer-18] INFO foo - Received: 150 messages so far. Last group took: 108 millis which is: 92.593 messages per second. average: 167.224 2009-10-29 10:02:14,171 [enerContainer-6] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:14,177 [nerContainer-14] INFO foo - Received: 160 messages so far. Last group took: 106 millis which is: 94.34 messages per second. average: 159.521 2009-10-29 10:02:14,198 [enerContainer-9] INFO foo - Received: 170 messages so far. Last group took: 21 millis which is: 476.19 messages per second. average: 166.016 2009-10-29 10:02:14,199 [nerContainer-13] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:14,306 [nerContainer-14] INFO ThrottlingRoutePolicy - Throtteling consumer: 20 > 16 inflight exchange by suspending consumer. 2009-10-29 10:02:14,314 [nerContainer-12] INFO foo - Received: 180 messages so far. Last group took: 116 millis which is: 86.207 messages per second. average: 157.895 2009-10-29 10:02:14,317 [nerContainer-18] INFO foo - Received: 190 messages so far. Last group took: 3 millis which is: 3,333.333 messages per second. average: 166.229 2009-10-29 10:02:14,317 [nerContainer-10] INFO ThrottlingRoutePolicy - Throtteling consumer: 4 <= 4 inflight exchange by resuming consumer. 2009-10-29 10:02:14,432 [enerContainer-3] INFO foo - Received: 200 messages so far. Last group took: 115 millis which is: 86.957 messages per second. average: 158.983 2009-10-29 10:02:14,434 [main ] INFO DefaultCamelContext - Apache Camel 2.1-SNAPSHOT (CamelContext:camelContext) is stopping On Wed, Oct 28, 2009 at 3:15 PM, Claus Ibsen <claus.ibsen@...> wrote: > Hi > > I have done some experiment and I am setting down for something like this > > Where we can configure a route policy to use. > In this case we use a throtteling policy that is based on the number > of in flight exchanges. > > In this example at most 16 in flights messages is aimed for. It can go higher. > But when the policy is investigated it will react if the current in > flight > max. > If so it will suspend/stop the consumer. And when it goes down to 25% > of 16 (=4) then it will resume / start the consumer again. > > Apache CXF have already this feature. So its inspired from this project. > > <bean id="myPolicy" class="org.apache.camel.impl.ThrottelingRoutePolicy"> > <property name="maxInflightExchanges" value="16"/> > <property name="reconnectPercentOfMax" value="25"/> > </bean> > > <camelContext xmlns="http://camel.apache.org/schema/spring"> > <route routePolicyRef="myPolicy"> > <from uri="activemq:queue:foo?concurrentConsumers=20"/> > <transacted/> > <delay><constant>100</constant></delay> > <to uri="log:foo?groupSize=10"/> > <to uri="mock:result"/> > </route> > </camelContext> > > > > You can of course implement your own route policy and do whatever you like. > > The route policy is this interface. I amy want to adjust the naming of > that method and maybe change it to be more routish. > Its based on a callback that is invoked when an Exchange is done being > routed. So maybe something as: > > void onRouteDone(Route route, Exchange exchange); > > is better as its more clear to the point. And you can get the Consumer > from the Route so you can grab it and susped/stop it on the fly. > > > public interface RoutePolicy { > > /** > * Callback invokes when a {@link Consumer} have consumed and > created an {@link Exchange} > * > * @param consumer the consumer > * @param exchange the created exchange > */ > void onConsume(Consumer consumer, Exchange exchange); > } > > > > > On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <claus.ibsen@...> wrote: >> Hi >> >> A bit of background is CAMEL-1048 >> https://issues.apache.org/activemq/browse/CAMEL-1048 >> >> I am looking into this how to implement this nicely in Camel. >> >> CAMEL-1048 is maybe a bit too much at current requirements from Camel users. >> What they are looking for is to be able to dynamic throttle consumers. >> >> Apache CXF and ServiceMix has such a feature specially build in their >> JMS components. >> What it does is that it can stop the JMS listener when there are too >> many messages in flight. >> >> But on the other hand we have also had Camel users wanting to dynamic >> start/stop consumers depending on some flag of some sort. >> So there is also grounds to make this a general solution that can >> cater both use cases. >> >> I do wonder how to move forward. >> In CAMEL-1048 there is an example using requires to associate a route >> that this predicate must be true for the route to be running. >> I do not like the naming requires or require. And having it in the >> fluent builder / DSL makes it fixed. >> >> Wonder if we should name such a configuration a RoutePolicy so you can >> configure it as >> >> <route routePolicy="myRoutePolicy"> >> <from .../> >> ... >> </route> >> >> <bean id="myRoutePolicy" class=...> >> <property name="maxInflightExchanges" value="500"/> >> </bean> >> >> This is quite flexible as we can just offer a SPI interface for the >> RoutePolicy and have people implement it as how they like it. >> For example as above something that is controlled by the number of >> current in flight exchanges. >> >> Others can control it by CPU utliization, a switch, timer based etc. >> >> >> In the Java DSL its >> >> from("xxx").routePolicy(myRoutePolicy).to(yyyy); >> >> >> >> >> -- >> Claus Ibsen >> Apache Camel Committer >> >> Author of Camel in Action: http://www.manning.com/ibsen/ >> Open Source Integration: http://fusesource.com >> Blog: http://davsclaus.blogspot.com/ >> Twitter: http://twitter.com/davsclaus >> > > > > -- > Claus Ibsen > Apache Camel Committer > > Author of Camel in Action: http://www.manning.com/ibsen/ > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: Discussion - Route Policy feature in Camel 2.1Hi
I recently added an example as well based on throttling a jms route. camel-example-route-throttling There is a README.txt with instructions to run it. On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <claus.ibsen@...> wrote: > Hi > > A bit of background is CAMEL-1048 > https://issues.apache.org/activemq/browse/CAMEL-1048 > > I am looking into this how to implement this nicely in Camel. > > CAMEL-1048 is maybe a bit too much at current requirements from Camel users. > What they are looking for is to be able to dynamic throttle consumers. > > Apache CXF and ServiceMix has such a feature specially build in their > JMS components. > What it does is that it can stop the JMS listener when there are too > many messages in flight. > > But on the other hand we have also had Camel users wanting to dynamic > start/stop consumers depending on some flag of some sort. > So there is also grounds to make this a general solution that can > cater both use cases. > > I do wonder how to move forward. > In CAMEL-1048 there is an example using requires to associate a route > that this predicate must be true for the route to be running. > I do not like the naming requires or require. And having it in the > fluent builder / DSL makes it fixed. > > Wonder if we should name such a configuration a RoutePolicy so you can > configure it as > > <route routePolicy="myRoutePolicy"> > <from .../> > ... > </route> > > <bean id="myRoutePolicy" class=...> > <property name="maxInflightExchanges" value="500"/> > </bean> > > This is quite flexible as we can just offer a SPI interface for the > RoutePolicy and have people implement it as how they like it. > For example as above something that is controlled by the number of > current in flight exchanges. > > Others can control it by CPU utliization, a switch, timer based etc. > > > In the Java DSL its > > from("xxx").routePolicy(myRoutePolicy).to(yyyy); > > > > > -- > Claus Ibsen > Apache Camel Committer > > Author of Camel in Action: http://www.manning.com/ibsen/ > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
|
|
Re: Discussion - Route Policy feature in Camel 2.1Hi
I have implemented this feature into the trunk now. You can checkout the documentation at http://camel.apache.org/routepolicy.html And I have also added an example you can play with ./examples/camel-example-route-throttling Which also has a bit of documentation at http://cwiki.apache.org/confluence/display/CAMEL/Route+Throttling+Example On Thu, Oct 29, 2009 at 12:40 PM, Claus Ibsen <claus.ibsen@...> wrote: > Hi > > I recently added an example as well based on throttling a jms route. > > camel-example-route-throttling > > There is a README.txt with instructions to run it. > > > > On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <claus.ibsen@...> wrote: >> Hi >> >> A bit of background is CAMEL-1048 >> https://issues.apache.org/activemq/browse/CAMEL-1048 >> >> I am looking into this how to implement this nicely in Camel. >> >> CAMEL-1048 is maybe a bit too much at current requirements from Camel users. >> What they are looking for is to be able to dynamic throttle consumers. >> >> Apache CXF and ServiceMix has such a feature specially build in their >> JMS components. >> What it does is that it can stop the JMS listener when there are too >> many messages in flight. >> >> But on the other hand we have also had Camel users wanting to dynamic >> start/stop consumers depending on some flag of some sort. >> So there is also grounds to make this a general solution that can >> cater both use cases. >> >> I do wonder how to move forward. >> In CAMEL-1048 there is an example using requires to associate a route >> that this predicate must be true for the route to be running. >> I do not like the naming requires or require. And having it in the >> fluent builder / DSL makes it fixed. >> >> Wonder if we should name such a configuration a RoutePolicy so you can >> configure it as >> >> <route routePolicy="myRoutePolicy"> >> <from .../> >> ... >> </route> >> >> <bean id="myRoutePolicy" class=...> >> <property name="maxInflightExchanges" value="500"/> >> </bean> >> >> This is quite flexible as we can just offer a SPI interface for the >> RoutePolicy and have people implement it as how they like it. >> For example as above something that is controlled by the number of >> current in flight exchanges. >> >> Others can control it by CPU utliization, a switch, timer based etc. >> >> >> In the Java DSL its >> >> from("xxx").routePolicy(myRoutePolicy).to(yyyy); >> >> >> >> >> -- >> Claus Ibsen >> Apache Camel Committer >> >> Author of Camel in Action: http://www.manning.com/ibsen/ >> Open Source Integration: http://fusesource.com >> Blog: http://davsclaus.blogspot.com/ >> Twitter: http://twitter.com/davsclaus >> > > > > -- > Claus Ibsen > Apache Camel Committer > > Author of Camel in Action: http://www.manning.com/ibsen/ > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus |
| Free embeddable forum powered by Nabble | Forum Help |