summaryrefslogtreecommitdiff
path: root/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java169
1 files changed, 12 insertions, 157 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 9323111fdd..6e64ea5597 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -54,13 +54,10 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
@@ -73,7 +70,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
@@ -423,8 +419,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
- recoverBrokerLinks(lrh);
+ brh.completeBindingRecovery();
}
catch (DatabaseException e)
{
@@ -466,66 +461,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _linkDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
- long createTime = LongBinding.entryToLong(value);
- Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
-
- ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
-
- recoverBridges(brh, id);
- }
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
- private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _bridgeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
- if(parentId.equals(linkId))
- {
-
- long createTime = LongBinding.entryToLong(value);
- Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
- brh.bridge(id,createTime,arguments);
- }
- }
- brh.completeBridgeRecoveryForLink();
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
{
@@ -940,89 +875,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- LongBinding.longToEntry(link.getCreateTime(), value);
- StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
-
- try
- {
- _linkDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Link " + link
- + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key);
- try
- {
- OperationStatus status = _linkDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Link " + link + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e);
- }
- }
-
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getQMFId(),value);
- LongBinding.longToEntry(bridge.getCreateTime(),value);
- StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value);
-
- try
- {
- _bridgeDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Bridge " + bridge
- + " to database: " + e.getMessage(), e);
- }
-
- }
- }
-
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key);
- try
- {
- OperationStatus status = _bridgeDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Bridge " + bridge + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e);
- }
- }
/**
* Places a message onto a specified queue, in a given transaction.
@@ -1050,7 +902,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " [Transaction" + tx + "]");
+ + " in transaction " + tx);
}
_deliveryDb.put(tx, key, value);
}
@@ -1204,7 +1056,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("commitTranImpl completed for [Transaction:" + tx + "]");
+ String transactionType = syncCommit ? "synchronous" : "asynchronous";
+ LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
}
}
catch (DatabaseException e)
@@ -1226,7 +1079,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("abortTran called for [Transaction:" + tx + "]");
+ LOGGER.debug("abortTran called for transaction " + tx);
}
try
@@ -1338,7 +1191,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Storing content for message " + messageId + "[Transaction" + tx + "]");
+ LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
}
}
@@ -1363,8 +1216,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
- + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
+ LOGGER.debug("storeMetaData called for transaction " + tx
+ + ", messageId " + messageId
+ + ", messageMetaData " + messageMetaData);
}
DatabaseEntry key = new DatabaseEntry();
@@ -1378,7 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageMetaDataDb.put(tx, key, value);
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]");
+ LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
}
}
catch (DatabaseException e)
@@ -1680,7 +1534,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
else
{
ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
+ int length = getContent(offsetInMessage, buf);
+ buf.limit(length);
buf.position(0);
return buf;
}