Author: dejanb
Date: Wed Nov 11 15:50:45 2009
New Revision: 834922
URL:
http://svn.apache.org/viewvc?rev=834922&view=revLog:
http://issues.apache.org/activemq/browse/AMQ-2042 - shutdown if kahadb cannot access disk
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=834922&r1=834921&r2=834922&view=diff==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Nov 11 15:50:45 2009
@@ -732,7 +732,7 @@
try {
brokerService.stop();
} catch (Exception e) {
- LOG.warn("Failure occured while stopping broker");
+ LOG.warn("Failure occured while stopping broker", e);
}
}
}.start();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=834922&r1=834921&r2=834922&view=diff==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Wed Nov 11 15:50:45 2009
@@ -17,6 +17,8 @@
package org.apache.activemq.store.kahadb;
import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -37,7 +39,7 @@
* @org.apache.xbean.XBean element="kahaDB"
* @version $Revision: 1.17 $
*/
-public class KahaDBPersistenceAdapter implements PersistenceAdapter {
+public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private KahaDBStore letter = new KahaDBStore();
@@ -364,4 +366,8 @@
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
+
+ public void setBrokerService(BrokerService brokerService) {
+ letter.setBrokerService(brokerService);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=834922&r1=834921&r2=834922&view=diff==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Nov 11 15:50:45 2009
@@ -36,6 +36,8 @@
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.SubscriptionInfo;
@@ -75,8 +77,11 @@
import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
+import org.springframework.core.enums.LetterCodedLabeledEnum;
-public class MessageDatabase {
+public class MessageDatabase implements BrokerServiceAware {
+
+ private BrokerService brokerService;
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@@ -259,6 +264,9 @@
}
} catch (InterruptedException e) {
// Looks like someone really wants us to exit this thread...
+ } catch (IOException ioe) {
+ LOG.error("Checkpoint failed", ioe);
+ stopBroker();
}
}
};
@@ -575,26 +583,22 @@
return journal.getNextLocation(null);
}
- protected void checkpointCleanup(final boolean cleanup) {
- try {
- long start = System.currentTimeMillis();
- synchronized (indexMutex) {
- if( !opened.get() ) {
- return;
- }
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, cleanup);
- }
- });
- }
- long end = System.currentTimeMillis();
- if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
- LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+ protected void checkpointCleanup(final boolean cleanup) throws IOException {
+ long start = System.currentTimeMillis();
+ synchronized (indexMutex) {
+ if( !opened.get() ) {
+ return;
}
- } catch (IOException e) {
- e.printStackTrace();
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, cleanup);
+ }
+ });
}
+ long end = System.currentTimeMillis();
+ if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+ }
}
@@ -623,7 +627,6 @@
* durring a recovery process.
*/
public Location store(JournalCommand data, boolean sync) throws IOException {
-
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -1530,4 +1533,20 @@
public void setChecksumJournalFiles(boolean checksumJournalFiles) {
this.checksumJournalFiles = checksumJournalFiles;
}
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+
+ protected void stopBroker() {
+ new Thread() {
+ public void run() {
+ try {
+ brokerService.stop();
+ } catch (Exception e) {
+ LOG.warn("Failure occured while stopping broker", e);
+ }
+ }
+ }.start();
+ }
}
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=834922&r1=834921&r2=834922&view=diff==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Wed Nov 11 15:50:45 2009
@@ -158,7 +158,7 @@
* @throws
*/
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
-
+
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
@@ -298,6 +298,7 @@
protected void processQueue() {
DataFile dataFile = null;
RandomAccessFile file = null;
+ WriteBatch wb = null;
try {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@@ -321,7 +322,7 @@
enqueueMutex.notify();
}
- WriteBatch wb = (WriteBatch)o;
+ wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
file.setLength(dataFile.getLength());
@@ -403,6 +404,9 @@
wb.latch.countDown();
}
} catch (IOException e) {
+ if (wb != null) {
+ wb.latch.countDown();
+ }
synchronized (enqueueMutex) {
firstAsyncException = e;
}