|
View:
New views
1 Messages
—
Rating Filter:
Alert me
|
|
|
Indefinite Message Retention on Virtual TopicWe are currently experiencing indefinite message retention on a virtual topic with durable subscribers spanning multiple nodes in a broker network. We have two applications -- AppA and AppB each with one durable subscriber (implemented as message driven EJBs with clientIDs and subscription names defined) and each application is connected to its own broker cluster -- Broker1 and Broker2. Broker1 and Broker2 are connected via a broker network and have fixed, reliable names. When a message is sent to the virtual topic, we are expecting to see the following acknowledgements occur:
1. MDB in AppA acknowledge in Broker1 2. Broker1 acknowledge the Broker1<>Broker2 network 3. MDB in Appb acknowledge in Broker2 4. Broker2 acknowledge the Broker2<>Broker1 network All acknowledgements occur except that last one. To further complicate the scenario, Broker1 is running 5.2.0 in a standalone master/slave cluster using JDBC persistance to an Oracle 10g R2 RAC. Broker2 is running ActiveMQ 5.1.0 embedded in JBoss 4.0.5 using JDBC persistance to PostgreSQL 8.3.4 (we will migrating Broker2 in the near future, but production stability requirements demand that it stay in its current deployment model and revision). I have also attached santized versions of our broker configuration files. Are one or both of the brokers improperly configured? Is the virtual topic properly defined? Thank you in advance for your assistance, -John Burwell <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:///${activemq.base}/conf/activemq.properties</value> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${broker.name}" advisorySupport="true" useJmx="true" persistent="true" dataDirectory="${activemq.base}/data" dataDirectoryFile="${activemq.base}/data" tmpDataDirectory="${activemq.base}/data"> <!-- Destination specific policies using destination names or wildcards --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" memoryLimit="5mb"/> <policyEntry topic=">" memoryLimit="5mb"/> </policyEntries> </policyMap> </destinationPolicy> <!-- Use the following to configure how ActiveMQ is exposed in JMX --> <!-- <managementContext> <managementContext connectorPath="service:jmx:rmi://amq-jmx.int.brivo.net:1099/jmxrmi" /> </managementContext> --> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="VirtualTopic.camera.status" prefix="VirtualTopicConsumers.*." /> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> <destinations> <queue name="activemq/queue/camera/command" physicalName="queue.camera.command" /> <topic name="activemq/topic/camera/status" physicalName="VirtualTopic.camera.status" /> </destinations> <!-- The store and forward broker networks ActiveMQ will listen to --> <networkConnectors> <networkConnector name="message-fabric" uri="static:(${broker.network.uri})" networkTTL="1" conduitSubscriptions="false" dynamicOnly="true"> <excludedDestinations> <queue physicalName="Consumer.*.VirtualTopic.>" /> </excludedDestinations> </networkConnector> </networkConnectors> <persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataDirectoryFile="${activemq.base}/data" dataSource="#oracle-ds" /> <statements> <statements lockCreateStatement="select * from activemq_lock where id=1 for update nowait" /> </statements> </persistenceAdapter> <!-- The maximum about of space the broker will use before slowing down producers --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb" name="foo"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors ActiveMQ will listen to --> <transportConnectors> <transportConnector name="openwire" uri="tcp://amq:61616" /> <transportConnector name="video-bridge" uri="tcp://amqvideo:61630" /> </transportConnectors> </broker> <!-- An embedded servlet engine for serving up the Admin console --> <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> <connectors> <nioConnector host="amq-jmx" port="8161"/> </connectors> <handlers> <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/> </handlers> </jetty> <bean id="oracle-ds" class="oracle.jdbc.pool.OracleDataSource" destroy-method="close"> <property name="URL" value="${db.url}" /> <property name="user" value="${db.username}" /> <property name="password" value="${db.password}" /> <property name="connectionCacheName" value="amq_cache" /> <property name="connectionCachingEnabled" value="true" /> <property name="fastConnectionFailoverEnabled" value="${db.fast_connection_failover}" /> <property name="connectionCacheProperties"> <props> <prop key="MinLimit">${db.pool.min_connections}</prop> <prop key="MaxLimit">${db.pool.max_connections}</prop> </props> </property> </bean> </beans> <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schma/core/activemq-core.xsd http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>${jboss.server.config.url}/activemq.properties</value> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" advisorySupport="true" brokerName="${broker.name}" useJmx="true" persistent="true" dataDirectory="${jboss.server.data.dir}/activemq" dataDirectoryFile="${jboss.server.data.dir}/activemq" tmpDataDirectory="${jboss.server.temp.dir}/activemq"> <managementContext> <managementContext createConnector="false"/> </managementContext> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="VirtualTopic.camera.status" prefix="VirtualTopicConsumers.*." /> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> <destinations> <queue name="activemq/queue/camera/command" physicalName="queue.camera.command" /> <queue name="activemq/queue/camera/connection" physicalName="queue.camera.connection" /> <topic name="activemq/topic/camera/status" physicalName="VirtualTopic.camera.status" /> </destinations> <networkConnectors> <networkConnector name="application-video-network" uri="static:(failover:(${activemq.broker1.broker-cluster}))" networkTTL="1" conduitSubscriptions="false" dynamicOnly="true"> <excludedDestinations> <queue physicalName="Consumer.*.VirtualTopic.>"/> </excludedDestinations> </networkConnector> </networkConnectors> <persistenceAdapter> <jdbcPersistenceAdapter createTablesOnStartup="true" dataDirectory="${jboss.server.data.dir}/activemq" dataDirectoryFile="${jboss.server.data.dir}/activemq" dataSource="#postgres-ds"/> </persistenceAdapter> <transportConnectors> <transportConnector name="application-video-consumer" uri="tcp://${jboss.bind.address}:61616" /> <transportConnector name="camera-status-consumer" uri="stomp://${jboss.bind.address}:61617" /> </transportConnectors> <!-- The maximum about of space the broker will use before slowing down producers --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb" name="foo"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> </broker> <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="${activemq.db.server.name}"/> <property name="databaseName" value="amq"/> <property name="portNumber" value="5432"/> <property name="user" value="${activemq.db.uid}"/> <property name="password" value="${activemq.db.pwd}"/> <property name="dataSourceName" value="postgres"/> <property name="initialConnections" value="${activemq.db.pool.min_connections}"/> <property name="maxConnections" value="${activemq.db.pool.max_connections}"/> </bean> </beans> |
| Free embeddable forum powered by Nabble | Forum Help |