diff options
Diffstat (limited to 'java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java')
-rw-r--r-- | java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java | 169 |
1 files changed, 12 insertions, 157 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 9323111fdd..6e64ea5597 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -54,13 +54,10 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; @@ -73,7 +70,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; @@ -423,8 +419,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); - recoverBrokerLinks(lrh); + brh.completeBindingRecovery(); } catch (DatabaseException e) { @@ -466,66 +461,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) - { - Cursor cursor = null; - - try - { - cursor = _linkDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - long createTime = LongBinding.entryToLong(value); - Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); - - ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments); - - recoverBridges(brh, id); - } - } - finally - { - closeCursorSafely(cursor); - } - - } - - private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId) - { - Cursor cursor = null; - - try - { - cursor = _bridgeDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); - if(parentId.equals(linkId)) - { - - long createTime = LongBinding.entryToLong(value); - Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); - brh.bridge(id,createTime,arguments); - } - } - brh.completeBridgeRecoveryForLink(); - } - finally - { - closeCursorSafely(cursor); - } - - } - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException { @@ -940,89 +875,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - public void createBrokerLink(final BrokerLink link) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key); - - DatabaseEntry value = new DatabaseEntry(); - LongBinding.longToEntry(link.getCreateTime(), value); - StringMapBinding.getInstance().objectToEntry(link.getArguments(), value); - - try - { - _linkDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Link " + link - + " to database: " + e.getMessage(), e); - } - } - } - - public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key); - try - { - OperationStatus status = _linkDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Link " + link + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e); - } - } - - public void createBridge(final Bridge bridge) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key); - - DatabaseEntry value = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getQMFId(),value); - LongBinding.longToEntry(bridge.getCreateTime(),value); - StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value); - - try - { - _bridgeDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Bridge " + bridge - + " to database: " + e.getMessage(), e); - } - - } - } - - public void deleteBridge(final Bridge bridge) throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key); - try - { - OperationStatus status = _bridgeDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Bridge " + bridge + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e); - } - } /** * Places a message onto a specified queue, in a given transaction. @@ -1050,7 +902,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { LOGGER.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " [Transaction" + tx + "]"); + + " in transaction " + tx); } _deliveryDb.put(tx, key, value); } @@ -1204,7 +1056,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (LOGGER.isDebugEnabled()) { - LOGGER.debug("commitTranImpl completed for [Transaction:" + tx + "]"); + String transactionType = syncCommit ? "synchronous" : "asynchronous"; + LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); } } catch (DatabaseException e) @@ -1226,7 +1079,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("abortTran called for [Transaction:" + tx + "]"); + LOGGER.debug("abortTran called for transaction " + tx); } try @@ -1338,7 +1191,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Storing content for message " + messageId + "[Transaction" + tx + "]"); + LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); } } @@ -1363,8 +1216,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = " - + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called"); + LOGGER.debug("storeMetaData called for transaction " + tx + + ", messageId " + messageId + + ", messageMetaData " + messageMetaData); } DatabaseEntry key = new DatabaseEntry(); @@ -1378,7 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageMetaDataDb.put(tx, key, value); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]"); + LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); } } catch (DatabaseException e) @@ -1680,7 +1534,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore else { ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); + int length = getContent(offsetInMessage, buf); + buf.limit(length); buf.position(0); return buf; } |