svn commit: r834922 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/journal/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/apache/kahadb/journal/

View: New views
1 Messages — Rating Filter:   Alert me  

svn commit: r834922 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/journal/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/apache/kahadb/journal/

by dejanb-2 :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

Author: dejanb
Date: Wed Nov 11 15:50:45 2009
New Revision: 834922

URL: http://svn.apache.org/viewvc?rev=834922&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-2042 - shutdown if kahadb cannot access disk

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=834922&r1=834921&r2=834922&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Nov 11 15:50:45 2009
@@ -732,7 +732,7 @@
            try {
                 brokerService.stop();
             } catch (Exception e) {
-                LOG.warn("Failure occured while stopping broker");
+                LOG.warn("Failure occured while stopping broker", e);
             }    
      }
      }.start();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=834922&r1=834921&r2=834922&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Wed Nov 11 15:50:45 2009
@@ -17,6 +17,8 @@
 package org.apache.activemq.store.kahadb;
 
 import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -37,7 +39,7 @@
  * @org.apache.xbean.XBean element="kahaDB"
  * @version $Revision: 1.17 $
  */
-public class KahaDBPersistenceAdapter implements PersistenceAdapter {
+public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
     private KahaDBStore letter = new KahaDBStore();
     
 
@@ -364,4 +366,8 @@
     public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
         letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
     }
+
+ public void setBrokerService(BrokerService brokerService) {
+ letter.setBrokerService(brokerService);
+ }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=834922&r1=834921&r2=834922&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Nov 11 15:50:45 2009
@@ -36,6 +36,8 @@
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -75,8 +77,11 @@
 import org.apache.kahadb.util.SequenceSet;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
+import org.springframework.core.enums.LetterCodedLabeledEnum;
 
-public class MessageDatabase {
+public class MessageDatabase implements BrokerServiceAware {
+
+ private BrokerService brokerService;
 
     public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
     public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@@ -259,6 +264,9 @@
                     }
                 } catch (InterruptedException e) {
                     // Looks like someone really wants us to exit this thread...
+                } catch (IOException ioe) {
+                 LOG.error("Checkpoint failed", ioe);
+                 stopBroker();
                 }
             }
         };
@@ -575,26 +583,22 @@
         return journal.getNextLocation(null);
  }
 
-    protected void checkpointCleanup(final boolean cleanup) {
-        try {
-         long start = System.currentTimeMillis();
-            synchronized (indexMutex) {
-             if( !opened.get() ) {
-             return;
-             }
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, cleanup);
-                    }
-                });
-            }
-         long end = System.currentTimeMillis();
-         if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-         LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+    protected void checkpointCleanup(final boolean cleanup) throws IOException {
+     long start = System.currentTimeMillis();
+        synchronized (indexMutex) {
+         if( !opened.get() ) {
+         return;
          }
-        } catch (IOException e) {
-         e.printStackTrace();
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    checkpointUpdate(tx, cleanup);
+                }
+            });
         }
+     long end = System.currentTimeMillis();
+     if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+     LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+     }
     }
 
     
@@ -623,7 +627,6 @@
      * durring a recovery process.
      */
     public Location store(JournalCommand data, boolean sync) throws IOException {
-
     
         int size = data.serializedSizeFramed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -1530,4 +1533,20 @@
     public void setChecksumJournalFiles(boolean checksumJournalFiles) {
         this.checksumJournalFiles = checksumJournalFiles;
     }
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+
+    protected void stopBroker() {
+        new Thread() {
+           public void run() {
+           try {
+                brokerService.stop();
+            } catch (Exception e) {
+                LOG.warn("Failure occured while stopping broker", e);
+            }    
+     }
+     }.start();
+    }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=834922&r1=834921&r2=834922&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Wed Nov 11 15:50:45 2009
@@ -158,7 +158,7 @@
      * @throws
      */
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
-
+    
         // Write the packet our internal buffer.
         int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
 
@@ -298,6 +298,7 @@
     protected void processQueue() {
         DataFile dataFile = null;
         RandomAccessFile file = null;
+        WriteBatch wb = null;
         try {
 
             DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@@ -321,7 +322,7 @@
                     enqueueMutex.notify();
                 }
 
-                WriteBatch wb = (WriteBatch)o;
+                wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
                         file.setLength(dataFile.getLength());
@@ -403,6 +404,9 @@
                 wb.latch.countDown();
             }
         } catch (IOException e) {
+         if (wb != null) {
+         wb.latch.countDown();
+         }
             synchronized (enqueueMutex) {
                 firstAsyncException = e;
             }