summaryrefslogtreecommitdiff
path: root/java/bdbstore/src/main/java/org/apache/qpid/server
diff options
context:
space:
mode:
Diffstat (limited to 'java/bdbstore/src/main/java/org/apache/qpid/server')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java169
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java4
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java41
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java4
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java41
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java31
6 files changed, 127 insertions, 163 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 9323111fdd..6e64ea5597 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -54,13 +54,10 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
@@ -73,7 +70,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
@@ -423,8 +419,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
- recoverBrokerLinks(lrh);
+ brh.completeBindingRecovery();
}
catch (DatabaseException e)
{
@@ -466,66 +461,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _linkDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
- long createTime = LongBinding.entryToLong(value);
- Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
-
- ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
-
- recoverBridges(brh, id);
- }
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
- private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _bridgeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
- if(parentId.equals(linkId))
- {
-
- long createTime = LongBinding.entryToLong(value);
- Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
- brh.bridge(id,createTime,arguments);
- }
- }
- brh.completeBridgeRecoveryForLink();
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
{
@@ -940,89 +875,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- LongBinding.longToEntry(link.getCreateTime(), value);
- StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
-
- try
- {
- _linkDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Link " + link
- + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key);
- try
- {
- OperationStatus status = _linkDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Link " + link + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e);
- }
- }
-
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getQMFId(),value);
- LongBinding.longToEntry(bridge.getCreateTime(),value);
- StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value);
-
- try
- {
- _bridgeDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Bridge " + bridge
- + " to database: " + e.getMessage(), e);
- }
-
- }
- }
-
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key);
- try
- {
- OperationStatus status = _bridgeDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Bridge " + bridge + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e);
- }
- }
/**
* Places a message onto a specified queue, in a given transaction.
@@ -1050,7 +902,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " [Transaction" + tx + "]");
+ + " in transaction " + tx);
}
_deliveryDb.put(tx, key, value);
}
@@ -1204,7 +1056,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("commitTranImpl completed for [Transaction:" + tx + "]");
+ String transactionType = syncCommit ? "synchronous" : "asynchronous";
+ LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
}
}
catch (DatabaseException e)
@@ -1226,7 +1079,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("abortTran called for [Transaction:" + tx + "]");
+ LOGGER.debug("abortTran called for transaction " + tx);
}
try
@@ -1338,7 +1191,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Storing content for message " + messageId + "[Transaction" + tx + "]");
+ LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
}
}
@@ -1363,8 +1216,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
- + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
+ LOGGER.debug("storeMetaData called for transaction " + tx
+ + ", messageId " + messageId
+ + ", messageMetaData " + messageMetaData);
}
DatabaseEntry key = new DatabaseEntry();
@@ -1378,7 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageMetaDataDb.put(tx, key, value);
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]");
+ LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
}
}
catch (DatabaseException e)
@@ -1680,7 +1534,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
else
{
ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
+ int length = getContent(offsetInMessage, buf);
+ buf.limit(length);
buf.position(0);
return buf;
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index c40f24dbc3..ba111e8091 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -105,7 +105,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
}});
- public static final String BDB_HA_STORE_TYPE = "BDB-HA";
+ public static final String TYPE = "BDB-HA";
private String _groupName;
private String _nodeName;
@@ -602,6 +602,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
@Override
public String getStoreType()
{
- return BDB_HA_STORE_TYPE;
+ return TYPE;
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java
new file mode 100644
index 0000000000..20dce2628d
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreFactory;
+
+public class BDBHAMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return BDBHAMessageStore.TYPE;
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new BDBHAMessageStore();
+ }
+
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 82bc3d8564..4028de4b80 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -42,7 +42,7 @@ import com.sleepycat.je.EnvironmentConfig;
public class BDBMessageStore extends AbstractBDBMessageStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
- private static final String BDB_STORE_TYPE = "BDB";
+ public static final String TYPE = "BDB";
private CommitThreadWrapper _commitThreadWrapper;
@Override
@@ -108,7 +108,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore
@Override
public String getStoreType()
{
- return BDB_STORE_TYPE;
+ return TYPE;
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
new file mode 100644
index 0000000000..126bf1928a
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreFactory;
+
+public class BDBMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return BDBMessageStore.TYPE;
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new BDBMessageStore();
+ }
+
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
index fe1556b5a6..598d20146c 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -80,7 +80,7 @@ public class CommitThreadWrapper
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
+ LOGGER.debug("complete() called for transaction " + _tx);
}
_complete = true;
@@ -101,7 +101,10 @@ public class CommitThreadWrapper
if(!_syncCommit)
{
- LOGGER.debug("CommitAsync was requested, returning immediately.");
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
+ }
return;
}
@@ -121,6 +124,12 @@ public class CommitThreadWrapper
public synchronized void waitForCompletion()
{
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
while (!isComplete())
{
_commitThread.explicitNotify();
@@ -133,6 +142,12 @@ public class CommitThreadWrapper
throw new RuntimeException(e);
}
}
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
}
}
@@ -198,8 +213,20 @@ public class CommitThreadWrapper
try
{
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
_environment.flushLog(true);
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("flushLog completed in " + duration + " ms");
+ }
+
for(int i = 0; i < size; i++)
{
BDBCommitFuture commit = _jobQueue.poll();