diff options
author | Robert Gemmell <robbie@apache.org> | 2012-04-17 09:01:44 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-04-17 09:01:44 +0000 |
commit | 3203eea7641e1b0f39de96d797db7c54423b7f02 (patch) | |
tree | f2563ba4a85ac54765d8f62663b60853846b3a89 | |
parent | deab61acfe5f4edaae121cf6b9fa5d4b9e42803f (diff) | |
download | qpid-python-3203eea7641e1b0f39de96d797db7c54423b7f02.tar.gz |
QPID-3923: Store queue, exchange and binding as configured objects in bdb store
Applied patch by Oleksandr Rudyy <orudyy@gmail.com>, Phil Harvey <phil@philharveyonline.com>, and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327003 13f79535-47bb-0310-9956-ffa450edef68
100 files changed, 3880 insertions, 1788 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 439ec8ac4f..fb1d7c5265 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; @@ -57,6 +57,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.apache.qpid.server.store.ConfiguredObjectHelper; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; @@ -73,19 +74,14 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord; -import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord; import org.apache.qpid.server.store.berkeleydb.entry.Xid; -import org.apache.qpid.server.store.berkeleydb.tuple.AMQShortStringBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.ExchangeBinding; import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.QueueBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.QueueBindingTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; @@ -104,23 +100,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore private Environment _environment; + private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; - private String QUEUEBINDINGSDB_NAME = "QUEUE_BINDINGS"; - private String DELIVERYDB_NAME = "DELIVERIES"; - private String EXCHANGEDB_NAME = "EXCHANGES"; - private String QUEUEDB_NAME = "QUEUES"; + private String DELIVERYDB_NAME = "QUEUE_ENTRIES"; private String BRIDGEDB_NAME = "BRIDGES"; private String LINKDB_NAME = "LINKS"; private String XIDDB_NAME = "XIDS"; - + private Database _configuredObjectsDb; private Database _messageMetaDataDb; private Database _messageContentDb; - private Database _queueBindingsDb; private Database _deliveryDb; - private Database _exchangeDb; - private Database _queueDb; private Database _bridgeDb; private Database _linkDb; private Database _xidDb; @@ -165,6 +156,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore private final EventManager _eventManager = new EventManager(); private String _storeLocation; + private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); + public AbstractBDBMessageStore() { _stateManager = new StateManager(_eventManager); @@ -239,7 +232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore LOGGER.info("Configuring BDB message store"); - setupStore(environmentPath); + setupStore(environmentPath, name); } /** @@ -257,11 +250,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore _stateManager.attainState(State.ACTIVE); } - protected void setupStore(File storePath) throws DatabaseException, AMQStoreException + protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { _environment = createEnvironment(storePath); - new Upgrader(_environment).upgradeIfNecessary(); + new Upgrader(_environment, name).upgradeIfNecessary(); openDatabases(); } @@ -326,10 +319,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore //This is required if we are wanting read only access. dbConfig.setReadOnly(false); + _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig); _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); - _queueDb = openDatabase(QUEUEDB_NAME, dbConfig); - _exchangeDb = openDatabase(EXCHANGEDB_NAME, dbConfig); - _queueBindingsDb = openDatabase(QUEUEBINDINGSDB_NAME, dbConfig); _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); _linkDb = openDatabase(LINKDB_NAME, dbConfig); @@ -376,23 +367,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageContentDb.close(); } - if (_exchangeDb != null) - { - LOGGER.info("Closing exchange database"); - _exchangeDb.close(); - } - - if (_queueBindingsDb != null) - { - LOGGER.info("Closing bindings database"); - _queueBindingsDb.close(); - } - - if (_queueDb != null) - { - LOGGER.info("Closing queue database"); - _queueDb.close(); - } + if (_configuredObjectsDb != null) + { + LOGGER.info("Closing configurable objects database"); + _configuredObjectsDb.close(); + } if (_deliveryDb != null) { @@ -440,14 +419,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore { try { + List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); QueueRecoveryHandler qrh = recoveryHandler.begin(this); - loadQueues(qrh); + _configuredObjectHelper.recoverQueues(qrh, configuredObjects); ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); - loadExchanges(erh); + _configuredObjectHelper.recoverExchanges(erh, configuredObjects); BindingRecoveryHandler brh = erh.completeExchangeRecovery(); - recoverBindings(brh); + _configuredObjectHelper.recoverBindings(brh, configuredObjects); ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); recoverBrokerLinks(lrh); @@ -459,29 +439,21 @@ public abstract class AbstractBDBMessageStore implements MessageStore } - private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException + private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException { Cursor cursor = null; - + List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>(); try { - cursor = _queueDb.openCursor(null, null); + cursor = _configuredObjectsDb.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); - QueueBinding binding = QueueBinding.getInstance(); while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { - QueueRecord queueRecord = binding.entryToObject(value); - - String queueName = queueRecord.getNameShortString() == null ? null : - queueRecord.getNameShortString().asString(); - String owner = queueRecord.getOwner() == null ? null : - queueRecord.getOwner().asString(); - boolean exclusive = queueRecord.isExclusive(); - - FieldTable arguments = queueRecord.getArguments(); - - qrh.queue(queueName, owner, exclusive, arguments); + ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value); + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + configuredObject.setId(id); + results.add(configuredObject); } } @@ -489,6 +461,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { closeCursorSafely(cursor); } + return results; } private void closeCursorSafely(Cursor cursor) @@ -499,74 +472,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - - private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException - { - Cursor cursor = null; - - try - { - cursor = _exchangeDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - ExchangeBinding binding = ExchangeBinding.getInstance(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - ExchangeRecord exchangeRec = binding.entryToObject(value); - - String exchangeName = exchangeRec.getNameShortString() == null ? null : - exchangeRec.getNameShortString().asString(); - String type = exchangeRec.getType() == null ? null : - exchangeRec.getType().asString(); - boolean autoDelete = exchangeRec.isAutoDelete(); - - erh.exchange(exchangeName, type, autoDelete); - } - } - finally - { - closeCursorSafely(cursor); - } - - } - - private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException - { - Cursor cursor = null; - try - { - cursor = _queueBindingsDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - QueueBindingTupleBinding binding = QueueBindingTupleBinding.getInstance(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - //yes, this is retrieving all the useful information from the key only. - //For table compatibility it shall currently be left as is - BindingRecord bindingRecord = binding.entryToObject(key); - - String exchangeName = bindingRecord.getExchangeName() == null ? null : - bindingRecord.getExchangeName().asString(); - String queueName = bindingRecord.getQueueName() == null ? null : - bindingRecord.getQueueName().asString(); - String routingKey = bindingRecord.getRoutingKey() == null ? null : - bindingRecord.getRoutingKey().asString(); - ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null : - java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes())); - - brh.binding(exchangeName, queueName, routingKey, argumentsBB); - } - } - finally - { - closeCursorSafely(cursor); - } - - } - - private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) { Cursor cursor = null; @@ -681,7 +586,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { QueueEntryKey qek = keyBinding.entryToObject(key); @@ -700,10 +604,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore for(QueueEntryKey entry : entries) { - AMQShortString queueName = entry.getQueueName(); + UUID queueId = entry.getQueueId(); long messageId = entry.getMessageId(); - - qerh.queueEntry(queueName.asString(),messageId); + qerh.queueEntry(queueId, messageId); } } catch (DatabaseException e) @@ -892,25 +795,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (_stateManager.isInState(State.ACTIVE)) { - ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(), - exchange.getTypeShortString(), exchange.isAutoDelete()); - - DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(exchange.getNameShortString(), key); - - DatabaseEntry value = new DatabaseEntry(); - ExchangeBinding exchangeBinding = ExchangeBinding.getInstance(); - exchangeBinding.objectToEntry(exchangeRec, value); - - try - { - _exchangeDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e); - } + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); + storeConfiguredObjectEntry(configuredObject); } } @@ -919,82 +805,47 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void removeExchange(Exchange exchange) throws AMQStoreException { - DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(exchange.getNameShortString(), key); - try + UUID id = exchange.getId(); + if (LOGGER.isDebugEnabled()) { - OperationStatus status = _exchangeDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Exchange " + exchange.getName() + " not found"); - } + LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called"); } - catch (DatabaseException e) + OperationStatus status = removeConfiguredObject(id); + if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found"); } } /** - * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) + * @see DurableConfigurationStore#bindQueue(Binding) */ - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - bindQueue(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args)); - } - - protected void bindQueue(final BindingRecord bindingRecord) throws AMQStoreException + public void bindQueue(Binding binding) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - DatabaseEntry key = new DatabaseEntry(); - QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance(); - - keyBinding.objectToEntry(bindingRecord, key); - - //yes, this is writing out 0 as a value and putting all the - //useful info into the key, don't ask me why. For table - //compatibility it shall currently be left as is - DatabaseEntry value = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0, value); - - try - { - _queueBindingsDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing binding for AMQQueue with name " + bindingRecord.getQueueName() + " to exchange " - + bindingRecord.getExchangeName() + " to database: " + e.getMessage(), e); - } + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); + storeConfiguredObjectEntry(configuredObject); } } /** - * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) + * @see DurableConfigurationStore#unbindQueue(Binding) */ - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + public void unbindQueue(Binding binding) throws AMQStoreException { - DatabaseEntry key = new DatabaseEntry(); - QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance(); - keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key); - - try + UUID id = binding.getId(); + if (LOGGER.isDebugEnabled()) { - OperationStatus status = _queueBindingsDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange " - + exchange.getName() + " not found"); - } + LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called"); } - catch (DatabaseException e) + + OperationStatus status = removeConfiguredObject(id); + if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange " - + exchange.getName() + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Binding " + binding + " not found"); } } @@ -1011,47 +862,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called"); - } - - QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), - queue.getOwner(), queue.isExclusive(), arguments); - - createQueue(queueRecord); - } - - /** - * Makes the specified queue persistent. - * - * Only intended for direct use during store upgrades. - * - * @param queueRecord Details of the queue to store. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - protected void createQueue(QueueRecord queueRecord) throws AMQStoreException - { if (_stateManager.isInState(State.ACTIVE)) { - DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(queueRecord.getNameShortString(), key); - - DatabaseEntry value = new DatabaseEntry(); - QueueBinding queueBinding = QueueBinding.getInstance(); - - queueBinding.objectToEntry(queueRecord, value); - try - { - _queueDb.put(null, key, value); - } - catch (DatabaseException e) + if (LOGGER.isDebugEnabled()) { - throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString() - + " to database: " + e.getMessage(), e); + LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId() + + ", arguments=" + arguments + "): called"); } + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); + storeConfiguredObjectEntry(configuredObject); } } @@ -1059,8 +878,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore * Updates the specified queue in the persistent store, IF it is already present. If the queue * is not present in the store, it will not be added. * - * NOTE: Currently only updates the exclusivity. - * * @param queue The queue to update the entry for. * @throws AMQStoreException If the operation fails for any reason. */ @@ -1074,28 +891,30 @@ public abstract class AbstractBDBMessageStore implements MessageStore try { DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(queue.getNameShortString(), key); + UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); + keyBinding.objectToEntry(queue.getId(), key); DatabaseEntry value = new DatabaseEntry(); DatabaseEntry newValue = new DatabaseEntry(); - QueueBinding queueBinding = QueueBinding.getInstance(); + ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT); - if(status == OperationStatus.SUCCESS) + OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT); + if (status == OperationStatus.SUCCESS) { - //read the existing record and apply the new exclusivity setting - QueueRecord queueRecord = queueBinding.entryToObject(value); - queueRecord.setExclusive(queue.isExclusive()); - - //write the updated entry to the store - queueBinding.objectToEntry(queueRecord, newValue); + ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value); + ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord); - _queueDb.put(null, key, newValue); + // write the updated entry to the store + configuredObjectBinding.objectToEntry(newQueueRecord, newValue); + status = _configuredObjectsDb.put(null, key, newValue); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error updating queue details within the store: " + status); + } } - else if(status != OperationStatus.NOTFOUND) + else if (status != OperationStatus.NOTFOUND) { - throw new AMQStoreException("Error updating queue details within the store: " + status); + throw new AMQStoreException("Error finding queue details within the store: " + status); } } catch (DatabaseException e) @@ -1113,27 +932,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void removeQueue(final AMQQueue queue) throws AMQStoreException { - AMQShortString name = queue.getNameShortString(); - + UUID id = queue.getId(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void removeQueue(AMQShortString name = " + name + "): called"); + LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called"); } - DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(name, key); - try + OperationStatus status = removeConfiguredObject(id); + if (status == OperationStatus.NOTFOUND) { - OperationStatus status = _queueDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue " + name + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found"); } } @@ -1233,11 +1041,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws AMQStoreException { - AMQShortString name = AMQShortString.valueOf(queue.getResourceName()); DatabaseEntry key = new DatabaseEntry(); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey dd = new QueueEntryKey(name, messageId); + QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); keyBinding.objectToEntry(dd, key); DatabaseEntry value = new DatabaseEntry(); ByteBinding.byteToEntry((byte) 0, value); @@ -1246,15 +1053,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]"); + LOGGER.debug("Enqueuing message " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() + + " [Transaction" + tx + "]"); } _deliveryDb.put(tx, key, value); } catch (DatabaseException e) { LOGGER.error("Failed to enqueue: " + e.getMessage(), e); - throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name - + " to database", e); + throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() + + " to database", e); } } @@ -1262,7 +1072,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore * Extracts a message from a specified queue, in a given transaction. * * @param tx The transaction for the operation. - * @param queue The name queue to take the message from. + * @param queue The queue to take the message from. * @param messageId The message to dequeue. * * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. @@ -1270,17 +1080,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws AMQStoreException { - AMQShortString name = new AMQShortString(queue.getResourceName()); DatabaseEntry key = new DatabaseEntry(); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(name, messageId); - + QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); + UUID id = queue.getId(); keyBinding.objectToEntry(queueEntryKey, key); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Dequeue message id " + messageId); + LOGGER.debug("Dequeue message id " + messageId + " from queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); } try @@ -1289,16 +1098,20 @@ public abstract class AbstractBDBMessageStore implements MessageStore OperationStatus status = _deliveryDb.delete(tx, key); if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); + throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); } else if (status != OperationStatus.SUCCESS) { - throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name); + throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue" + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); } if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Removed message " + messageId + ", " + name + " from delivery db"); + LOGGER.debug("Removed message " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id + + " from delivery db"); } } @@ -1438,7 +1251,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore * * @return a list of message ids for messages enqueued for a particular queue */ - List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException + List<Long> getEnqueuedMessages(UUID queueId) throws AMQStoreException { Cursor cursor = null; try @@ -1447,7 +1260,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore DatabaseEntry key = new DatabaseEntry(); - QueueEntryKey dd = new QueueEntryKey(queueName, 0); + QueueEntryKey dd = new QueueEntryKey(queueId, 0); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); keyBinding.objectToEntry(dd, key); @@ -1459,7 +1272,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); dd = keyBinding.entryToObject(key); - while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName)) + while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId)) { messageIds.add(dd.getMessageId()); @@ -1644,7 +1457,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); } - Cursor cursor = null; try { @@ -1706,24 +1518,59 @@ public abstract class AbstractBDBMessageStore implements MessageStore return _messageContentDb; } - Database getQueuesDb() - { - return _queueDb; - } - Database getDeliveryDb() { return _deliveryDb; } - Database getExchangesDb() + /** + * Makes the specified configured object persistent. + * + * @param configuredObject Details of the configured object to store. + * @throws AMQStoreException If the operation fails for any reason. + */ + private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws AMQStoreException { - return _exchangeDb; + if (_stateManager.isInState(State.ACTIVE)) + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); + keyBinding.objectToEntry(configuredObject.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); + + queueBinding.objectToEntry(configuredObject, value); + try + { + OperationStatus status = _configuredObjectsDb.put(null, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error writing configured object " + configuredObject + " to database: " + + status); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing configured object " + configuredObject + + " to database: " + e.getMessage(), e); + } + } } - Database getBindingsDb() + private OperationStatus removeConfiguredObject(UUID id) throws AMQStoreException { - return _queueBindingsDb; + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(id, key); + try + { + return _configuredObjectsDb.delete(null, key); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error deleting of configured object with id " + id + " from database", e); + } } protected abstract StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index e5aeed57cd..9f7eb4bfd9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -50,9 +50,9 @@ public class BDBMessageStore extends AbstractBDBMessageStore private final CommitThread _commitThread = new CommitThread("Commit-Thread"); @Override - protected void setupStore(File storePath) throws DatabaseException, AMQStoreException + protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { - super.setupStore(storePath); + super.setupStore(storePath, name); startCommitThread(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java deleted file mode 100644 index b9d868f909..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.entry; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class BindingRecord extends Object -{ - private final AMQShortString _exchangeName; - private final AMQShortString _queueName; - private final AMQShortString _routingKey; - private final FieldTable _arguments; - - public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments) - { - _exchangeName = exchangeName; - _queueName = queueName; - _routingKey = routingKey; - _arguments = arguments; - } - - - public AMQShortString getExchangeName() - { - return _exchangeName; - } - - public AMQShortString getQueueName() - { - return _queueName; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - public FieldTable getArguments() - { - return _arguments; - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java index a716758da3..e7cf93ff7a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java @@ -20,22 +20,22 @@ */ package org.apache.qpid.server.store.berkeleydb.entry; -import org.apache.qpid.framing.AMQShortString; +import java.util.UUID; public class QueueEntryKey { - private AMQShortString _queueName; + private UUID _queueId; private long _messageId; - public QueueEntryKey(AMQShortString queueName, long messageId) + public QueueEntryKey(UUID queueId, long messageId) { - _queueName = queueName; + _queueId = queueId; _messageId = messageId; } - public AMQShortString getQueueName() + public UUID getQueueId() { - return _queueName; + return _queueId; } public long getMessageId() diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java deleted file mode 100644 index 5ea82427dc..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.entry; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class QueueRecord extends Object -{ - private final AMQShortString _queueName; - private final AMQShortString _owner; - private final FieldTable _arguments; - private boolean _exclusive; - - public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments) - { - _queueName = queueName; - _owner = owner; - _exclusive = exclusive; - _arguments = arguments; - } - - public AMQShortString getNameShortString() - { - return _queueName; - } - - public AMQShortString getOwner() - { - return _owner; - } - - public boolean isExclusive() - { - return _exclusive; - } - - public void setExclusive(boolean exclusive) - { - _exclusive = exclusive; - } - - public FieldTable getArguments() - { - return _arguments; - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java deleted file mode 100644 index b57ffb0169..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class AMQShortStringBinding extends TupleBinding<AMQShortString> -{ - private static final AMQShortStringBinding INSTANCE = new AMQShortStringBinding(); - - public static AMQShortStringBinding getInstance() - { - return INSTANCE; - } - - /** private constructor forces getInstance instead */ - private AMQShortStringBinding() { } - - public AMQShortString entryToObject(TupleInput tupleInput) - { - return AMQShortStringEncoding.readShortString(tupleInput); - } - - public void objectToEntry(AMQShortString object, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(object, tupleOutput); - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java new file mode 100644 index 0000000000..8b84a4c9bb --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java @@ -0,0 +1,37 @@ +package org.apache.qpid.server.store.berkeleydb.tuple; + +import org.apache.qpid.server.store.ConfiguredObjectRecord; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord> +{ + private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(); + + public static ConfiguredObjectBinding getInstance() + { + return INSTANCE; + } + + /** non-public constructor forces getInstance instead */ + private ConfiguredObjectBinding() + { + } + + public ConfiguredObjectRecord entryToObject(TupleInput tupleInput) + { + String type = tupleInput.readString(); + String json = tupleInput.readString(); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(null, type, json); + return configuredObject; + } + + public void objectToEntry(ConfiguredObjectRecord object, TupleOutput tupleOutput) + { + tupleOutput.writeString(object.getType()); + tupleOutput.writeString(object.getAttributes()); + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java deleted file mode 100644 index d4b1475ac7..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class ExchangeBinding extends TupleBinding<ExchangeRecord> -{ - private static final ExchangeBinding INSTANCE = new ExchangeBinding(); - - public static ExchangeBinding getInstance() - { - return INSTANCE; - } - - /** private constructor forces getInstance instead */ - private ExchangeBinding() { } - - public ExchangeRecord entryToObject(TupleInput tupleInput) - { - AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput); - - boolean autoDelete = tupleInput.readBoolean(); - - return new ExchangeRecord(name, typeName, autoDelete); - } - - public void objectToEntry(ExchangeRecord exchange, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput); - AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput); - - tupleOutput.writeBoolean(exchange.isAutoDelete()); - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 33bf269880..09f2c50e2d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.store.berkeleydb.tuple; +import java.util.UUID; + import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; @@ -47,7 +49,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction Transaction.Record[] records = new Transaction.Record[input.readInt()]; for(int i = 0; i < records.length; i++) { - records[i] = new RecordImpl(input.readString(), input.readLong()); + records[i] = new RecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); } return records; } @@ -71,7 +73,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction output.writeInt(records.length); for(Transaction.Record record : records) { - output.writeString(record.getQueue().getResourceName()); + UUID id = record.getQueue().getId(); + output.writeLong(id.getMostSignificantBits()); + output.writeLong(id.getLeastSignificantBits()); output.writeLong(record.getMessage().getMessageNumber()); } } @@ -80,13 +84,13 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage { - private final String _queueName; private long _messageNumber; + private UUID _queueId; - public RecordImpl(String queueName, long messageNumber) + public RecordImpl(UUID queueId, long messageNumber) { - _queueName = queueName; _messageNumber = messageNumber; + _queueId = queueId; } public TransactionLogResource getQueue() @@ -114,9 +118,10 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction throw new UnsupportedOperationException(); } - public String getResourceName() + @Override + public UUID getId() { - return _queueName; + return _queueId; } } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java deleted file mode 100644 index 7e1c63cc28..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.DatabaseException; -import org.apache.log4j.Logger; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; -import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord; - -public class QueueBinding extends TupleBinding<QueueRecord> -{ - private static final Logger _logger = Logger.getLogger(QueueBinding.class); - - private static final QueueBinding INSTANCE = new QueueBinding(); - - public static QueueBinding getInstance() - { - return INSTANCE; - } - - /** private constructor forces getInstance instead */ - private QueueBinding() { } - - public QueueRecord entryToObject(TupleInput tupleInput) - { - try - { - AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); - // Addition for Version 2 of this table, read the queue arguments - FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); - // Addition for Version 3 of this table, read the queue exclusivity - boolean exclusive = tupleInput.readBoolean(); - - return new QueueRecord(name, owner, exclusive, arguments); - } - catch (DatabaseException e) - { - _logger.error("Unable to create binding: " + e, e); - return null; - } - - } - - public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); - AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); - // Addition for Version 2 of this table, store the queue arguments - FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); - // Addition for Version 3 of this table, store the queue exclusivity - tupleOutput.writeBoolean(queue.isExclusive()); - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java deleted file mode 100644 index 6ba929a541..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; -import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.DatabaseException; - -public class QueueBindingTupleBinding extends TupleBinding<BindingRecord> -{ - protected static final Logger _log = Logger.getLogger(QueueBindingTupleBinding.class); - - private static final QueueBindingTupleBinding INSTANCE = new QueueBindingTupleBinding(); - - public static QueueBindingTupleBinding getInstance() - { - return INSTANCE; - } - - /** private constructor forces getInstance instead */ - private QueueBindingTupleBinding() { } - - public BindingRecord entryToObject(TupleInput tupleInput) - { - AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); - - FieldTable arguments; - - // Addition for Version 2 of this table - try - { - arguments = FieldTableEncoding.readFieldTable(tupleInput); - } - catch (DatabaseException e) - { - _log.error("Unable to create binding: " + e, e); - return null; - } - - return new BindingRecord(exchangeName, queueName, routingKey, arguments); - } - - public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput); - AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput); - AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput); - - // Addition for Version 2 of this table - FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput); - } - -}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java index f65df23706..22d0ede31f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.server.store.berkeleydb.tuple; +import java.util.UUID; + import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; public class QueueEntryBinding extends TupleBinding<QueueEntryKey> @@ -43,15 +43,17 @@ public class QueueEntryBinding extends TupleBinding<QueueEntryKey> public QueueEntryKey entryToObject(TupleInput tupleInput) { - AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + UUID queueId = new UUID(tupleInput.readLong(), tupleInput.readLong()); long messageId = tupleInput.readLong(); - return new QueueEntryKey(queueName, messageId); + return new QueueEntryKey(queueId, messageId); } public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput) { - AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput); + UUID uuid = mk.getQueueId(); + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); tupleOutput.writeLong(mk.getMessageId()); } }
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java index c96c751694..43aa5aa2b4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java @@ -31,39 +31,33 @@ import com.sleepycat.je.Transaction; public abstract class AbstractStoreUpgrade implements StoreUpgrade { private static final Logger _logger = Logger.getLogger(AbstractStoreUpgrade.class); - protected static final String[] USER_FRIENDLY_NAMES = new String[] { "Exchanges", "Queues", "Queue bindings", - "Message deliveries", "Message metadata", "Message content", "Bridges", "Links", "Distributed transactions" }; - protected void reportFinished(Environment environment, String[] databaseNames, String[] userFriendlyNames) + protected void reportFinished(Environment environment, int version) { - if (_logger.isInfoEnabled()) + _logger.info("Completed upgrade to version " + version); + if (_logger.isDebugEnabled()) { - _logger.info("Upgraded:"); - List<String> databases = environment.getDatabaseNames(); - for (int i = 0; i < databaseNames.length; i++) - { - if (databases.contains(databaseNames[i])) - { - _logger.info(" " + getRowCount(databaseNames[i], environment) + " rows in " + userFriendlyNames[i]); - } - } + _logger.debug("Upgraded:"); + reportDatabaseRowCount(environment); } } + private void reportDatabaseRowCount(Environment environment) + { + List<String> databases = environment.getDatabaseNames(); + for (String database : databases) + { + _logger.debug(" " + getRowCount(database, environment) + " rows in " + database); + } + } - protected void reportStarting(Environment environment, String[] databaseNames, String[] userFriendlyNames) + protected void reportStarting(Environment environment, int version) { - if (_logger.isInfoEnabled()) + _logger.info("Starting store upgrade from version " + version); + if (_logger.isDebugEnabled()) { - _logger.info("Upgrading:"); - List<String> databases = environment.getDatabaseNames(); - for (int i = 0; i < databaseNames.length; i++) - { - if (databases.contains(databaseNames[i])) - { - _logger.info(" " + getRowCount(databaseNames[i], environment) + " rows from " + userFriendlyNames[i]); - } - } + _logger.debug("Upgrading:"); + reportDatabaseRowCount(environment); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java index 42a3173e21..925e40ea93 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java @@ -48,7 +48,7 @@ public abstract class CursorOperation implements DatabaseRunnable CursorOperation.this.processEntry(database, targetDatabase, transaction, key, value); if (getProcessedCount() % 1000 == 0) { - _logger.info("Processed " + getProcessedCount() + " messages of " + getRowCount() + "."); + _logger.info("Processed " + getProcessedCount() + " records of " + getRowCount() + "."); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java index df09726c8e..f73e2e5d78 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java @@ -26,6 +26,6 @@ import org.apache.qpid.AMQStoreException; public interface StoreUpgrade { - void performUpgrade(Environment environment, UpgradeInteractionHandler handler) + void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName) throws DatabaseException, AMQStoreException; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java index d3e4dfce12..49e5e700c4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java @@ -71,24 +71,16 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v4"; private static final String NEW_CONTENT_DB_NAME = "messageContentDb_v5"; - private static final String[] OLD_DATABASE_NAMES = new String[] { EXCHANGE_DB_NAME, OLD_QUEUE_DB_NAME, - OLD_BINDINGS_DB_NAME, OLD_DELIVERY_DB, OLD_METADATA_DB_NAME, OLD_CONTENT_DB_NAME, "bridges_v4", "links_v4", - "xids_v4" }; - private static final String[] NEW_DATABASE_NAMES = new String[] { "exchangeDb_v5", NEW_QUEUE_DB_NAME, - NEW_BINDINGS_DB_NAME, NEW_DELIVERY_DB, NEW_METADATA_DB_NAME, NEW_CONTENT_DB_NAME, "bridges_v5", "links_v5", - "xids_v5" }; - private static final byte COLON = (byte) ':'; private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class); - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler) throws DatabaseException, AMQStoreException + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) throws DatabaseException, AMQStoreException { - _logger.info("Starting store upgrade from version 4"); Transaction transaction = null; try { - reportStarting(environment, OLD_DATABASE_NAMES, USER_FRIENDLY_NAMES); + reportStarting(environment, 4); transaction = environment.beginTransaction(null, null); @@ -103,7 +95,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade renameRemainingDatabases(environment, handler, transaction); transaction.commit(); - reportFinished(environment, NEW_DATABASE_NAMES, USER_FRIENDLY_NAMES); + reportFinished(environment, 5); } catch (Exception e) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 54f6aa6f88..3265fb6823 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -24,13 +24,30 @@ import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteraction import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.NO; import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.YES; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.server.util.MapJsonSerializer; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.bind.tuple.TupleBinding; @@ -51,26 +68,71 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6.class); - private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v5"; - private static final String NEW_CONTENT_DB_NAME = "MESSAGE_CONTENT"; - private static final String META_DATA_DB_NAME = "messageMetaDataDb_v5"; + static final String OLD_CONTENT_DB_NAME = "messageContentDb_v5"; + static final String NEW_CONTENT_DB_NAME = "MESSAGE_CONTENT"; + static final String NEW_METADATA_DB_NAME = "MESSAGE_METADATA"; + static final String OLD_META_DATA_DB_NAME = "messageMetaDataDb_v5"; + static final String OLD_EXCHANGE_DB_NAME = "exchangeDb_v5"; + static final String OLD_QUEUE_DB_NAME = "queueDb_v5"; + static final String OLD_DELIVERY_DB_NAME = "deliveryDb_v5"; + static final String OLD_QUEUE_BINDINGS_DB_NAME = "queueBindingsDb_v5"; + static final String OLD_XID_DB_NAME = "xids_v5"; + static final String NEW_XID_DB_NAME = "XIDS"; + static final String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; + static final String NEW_DELIVERY_DB_NAME = "QUEUE_ENTRIES"; + static final String NEW_BRIDGES_DB_NAME = "BRIDGES"; + static final String NEW_LINKS_DB_NAME = "LINKS"; + static final String OLD_BRIDGES_DB_NAME = "bridges_v5"; + static final String OLD_LINKS_DB_NAME = "links_v5"; - private static final String NEW_DB_NAMES[] = { "EXCHANGES", "QUEUES", "QUEUE_BINDINGS", "DELIVERIES", - "MESSAGE_METADATA", NEW_CONTENT_DB_NAME, "BRIDGES", "LINKS", "XIDS" }; - private static final String OLD_DB_NAMES[] = { "exchangeDb_v5", "queueDb_v5", "queueBindingsDb_v5", "deliveryDb_v5", - META_DATA_DB_NAME, OLD_CONTENT_DB_NAME, "bridges_v5", "links_v5", "xids_v5" }; + static final String[] DEFAULT_EXCHANGES = { ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), ExchangeDefaults.FANOUT_EXCHANGE_NAME.asString(), + ExchangeDefaults.HEADERS_EXCHANGE_NAME.asString(), ExchangeDefaults.TOPIC_EXCHANGE_NAME.asString(), + ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString() }; + private static final Set<String> DEFAULT_EXCHANGES_SET = new HashSet<String>(Arrays.asList(DEFAULT_EXCHANGES)); - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler) throws DatabaseException, AMQStoreException + private MapJsonSerializer _serializer = new MapJsonSerializer(); + + /** + * Upgrades from a v5 database to a v6 database + * + * v6 is the first "new style" schema where we don't version every table, + * and the upgrade is re-runnable + * + * Change in this version: + * + * Message content is moved from the database messageContentDb_v5 to + * MESSAGE_CONTENT. The structure of the database changes from ( message-id: + * long, chunk-id: int ) -> ( size: int, byte[] data ) to ( message-id: + * long) -> ( byte[] data ) + * + * That is we keep only one record per message, which contains all the + * message content + * + * Queue, Exchange, Bindings entries are stored now as configurable objects + * in "CONFIGURED_OBJECTS" table. + */ + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) + throws DatabaseException, AMQStoreException + { + reportStarting(environment, 5); + upgradeMessages(environment, handler); + upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHostName); + renameDatabases(environment, null); + reportFinished(environment, 6); + } + + private void upgradeConfiguredObjectsAndDependencies(Environment environment, UpgradeInteractionHandler handler, String virtualHostName) + throws AMQStoreException { - _logger.info("Starting store upgrade from version 5"); Transaction transaction = null; try { - reportStarting(environment, OLD_DB_NAMES, USER_FRIENDLY_NAMES); transaction = environment.beginTransaction(null, null); - performUpgradeInternal(environment, handler, transaction); + upgradeConfiguredObjects(environment, handler, transaction, virtualHostName); + upgradeQueueEntries(environment, transaction, virtualHostName); + upgradeXidEntries(environment, transaction, virtualHostName); transaction.commit(); - reportFinished(environment, NEW_DB_NAMES, USER_FRIENDLY_NAMES); } catch (Exception e) { @@ -90,22 +152,53 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade } } - /** - * Upgrades from a v5 database to a v6 database - * - * v6 is the first "new style" schema where we don't version every table, and the upgrade is re-runnable - * - * Change in this version: - * - * Message content is moved from the database messageContentDb_v5 to MESSAGE_CONTENT. - * The structure of the database changes from - * ( message-id: long, chunk-id: int ) -> ( size: int, byte[] data ) - * to - * ( message-id: long) -> ( byte[] data ) - * - * That is we keep only one record per message, which contains all the message content - */ - public void performUpgradeInternal(final Environment environment, final UpgradeInteractionHandler handler, + private void upgradeMessages(final Environment environment, final UpgradeInteractionHandler handler) + throws AMQStoreException + { + Transaction transaction = null; + try + { + transaction = environment.beginTransaction(null, null); + upgradeMessages(environment, handler, transaction); + transaction.commit(); + } + catch (Exception e) + { + transaction.abort(); + if (e instanceof DatabaseException) + { + throw (DatabaseException) e; + } + else if (e instanceof AMQStoreException) + { + throw (AMQStoreException) e; + } + else + { + throw new AMQStoreException("Unexpected exception", e); + } + } + } + + private void renameDatabases(Environment environment, Transaction transaction) + { + List<String> databases = environment.getDatabaseNames(); + String[] oldDatabases = { OLD_META_DATA_DB_NAME, OLD_BRIDGES_DB_NAME, OLD_LINKS_DB_NAME }; + String[] newDatabases = { NEW_METADATA_DB_NAME, NEW_BRIDGES_DB_NAME, NEW_LINKS_DB_NAME }; + + for (int i = 0; i < oldDatabases.length; i++) + { + String oldName = oldDatabases[i]; + String newName = newDatabases[i]; + if (databases.contains(oldName)) + { + _logger.info("Renaming " + oldName + " into " + newName); + environment.renameDatabase(transaction, oldName, newName); + } + } + } + + private void upgradeMessages(final Environment environment, final UpgradeInteractionHandler handler, final Transaction transaction) throws AMQStoreException { _logger.info("Message Contents"); @@ -129,27 +222,13 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade metadataDatabase); } }; - new DatabaseTemplate(environment, META_DATA_DB_NAME, contentTransaction).run(metaDataDatabaseOperation); + new DatabaseTemplate(environment, OLD_META_DATA_DB_NAME, contentTransaction) + .run(metaDataDatabaseOperation); _logger.info(metaDataDatabaseOperation.getRowCount() + " Message Content Entries"); } }; new DatabaseTemplate(environment, OLD_CONTENT_DB_NAME, NEW_CONTENT_DB_NAME, transaction).run(contentOperation); - } - renameDatabases(environment, transaction); - } - - private void renameDatabases(Environment environment, Transaction transaction) - { - List<String> databases = environment.getDatabaseNames(); - for (int i = 0; i < OLD_DB_NAMES.length; i++) - { - String oldName = OLD_DB_NAMES[i]; - String newName = NEW_DB_NAMES[i]; - if (databases.contains(oldName)) - { - _logger.info("Renaming " + oldName + " into " + newName); - environment.renameDatabase(transaction, oldName, newName); - } + environment.removeDatabase(transaction, OLD_CONTENT_DB_NAME); } } @@ -221,7 +300,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade DatabaseEntry value = new DatabaseEntry(); dataBinding.objectToEntry(consolidatedData, value); - newDatabase.put(txn, key, value); + put(newDatabase, txn, key, value); } /** @@ -268,6 +347,264 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade return data; } + private void upgradeConfiguredObjects(Environment environment, UpgradeInteractionHandler handler, Transaction transaction, String virtualHostName) + throws AMQStoreException + { + upgradeQueues(environment, transaction, virtualHostName); + upgradeExchanges(environment, transaction, virtualHostName); + upgradeQueueBindings(environment, transaction, handler, virtualHostName); + } + + private void upgradeXidEntries(Environment environment, Transaction transaction, final String virtualHostName) + { + if (environment.getDatabaseNames().contains(OLD_XID_DB_NAME)) + { + _logger.info("Xid Records"); + final OldPreparedTransactionBinding oldTransactionBinding = new OldPreparedTransactionBinding(); + final NewPreparedTransactionBinding newTransactionBinding = new NewPreparedTransactionBinding(); + CursorOperation xidEntriesCursor = new CursorOperation() + { + @Override + public void processEntry(Database oldXidDatabase, Database newXidDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + OldPreparedTransaction oldPreparedTransaction = oldTransactionBinding.entryToObject(value); + OldRecordImpl[] oldDequeues = oldPreparedTransaction.getDequeues(); + OldRecordImpl[] oldEnqueues = oldPreparedTransaction.getEnqueues(); + + NewRecordImpl[] newEnqueues = null; + NewRecordImpl[] newDequeues = null; + if (oldDequeues != null) + { + newDequeues = new NewRecordImpl[oldDequeues.length]; + for (int i = 0; i < newDequeues.length; i++) + { + OldRecordImpl dequeue = oldDequeues[i]; + UUID id = UUIDGenerator.generateUUID(dequeue.getQueueName(), virtualHostName); + newDequeues[i] = new NewRecordImpl(id, dequeue.getMessageNumber()); + } + } + if (oldEnqueues != null) + { + newEnqueues = new NewRecordImpl[oldEnqueues.length]; + for (int i = 0; i < newEnqueues.length; i++) + { + OldRecordImpl enqueue = oldEnqueues[i]; + UUID id = UUIDGenerator.generateUUID(enqueue.getQueueName(), virtualHostName); + newEnqueues[i] = new NewRecordImpl(id, enqueue.getMessageNumber()); + } + } + NewPreparedTransaction newPreparedTransaction = new NewPreparedTransaction(newEnqueues, newDequeues); + DatabaseEntry newValue = new DatabaseEntry(); + newTransactionBinding.objectToEntry(newPreparedTransaction, newValue); + put(newXidDatabase, transaction, key, newValue); + } + }; + new DatabaseTemplate(environment, OLD_XID_DB_NAME, NEW_XID_DB_NAME, transaction).run(xidEntriesCursor); + environment.removeDatabase(transaction, OLD_XID_DB_NAME); + _logger.info(xidEntriesCursor.getRowCount() + " Xid Entries"); + } + } + + private void upgradeQueueEntries(Environment environment, Transaction transaction, final String virtualHostName) + { + _logger.info("Queue Delivery Records"); + if (environment.getDatabaseNames().contains(OLD_DELIVERY_DB_NAME)) + { + final OldQueueEntryBinding oldBinding = new OldQueueEntryBinding(); + final NewQueueEntryBinding newBinding = new NewQueueEntryBinding(); + CursorOperation queueEntriesCursor = new CursorOperation() + { + @Override + public void processEntry(Database oldDeliveryDatabase, Database newDeliveryDatabase, + Transaction transaction, DatabaseEntry key, DatabaseEntry value) + { + OldQueueEntryKey oldEntryRecord = oldBinding.entryToObject(key); + UUID queueId = UUIDGenerator.generateUUID(oldEntryRecord.getQueueName().asString(), virtualHostName); + + NewQueueEntryKey newEntryRecord = new NewQueueEntryKey(queueId, oldEntryRecord.getMessageId()); + DatabaseEntry newKey = new DatabaseEntry(); + newBinding.objectToEntry(newEntryRecord, newKey); + put(newDeliveryDatabase, transaction, newKey, value); + } + }; + new DatabaseTemplate(environment, OLD_DELIVERY_DB_NAME, NEW_DELIVERY_DB_NAME, transaction) + .run(queueEntriesCursor); + environment.removeDatabase(transaction, OLD_DELIVERY_DB_NAME); + _logger.info(queueEntriesCursor.getRowCount() + " Queue Delivery Record Entries"); + } + } + + private void upgradeQueueBindings(Environment environment, Transaction transaction, final UpgradeInteractionHandler handler, final String virtualHostName) + { + _logger.info("Queue Bindings"); + if (environment.getDatabaseNames().contains(OLD_QUEUE_BINDINGS_DB_NAME)) + { + final QueueBindingBinding binding = new QueueBindingBinding(); + CursorOperation bindingCursor = new CursorOperation() + { + @Override + public void processEntry(Database exchangeDatabase, Database configuredObjectsDatabase, + Transaction transaction, DatabaseEntry key, DatabaseEntry value) + { + // TODO: check and remove orphaned bindings + BindingRecord bindingRecord = binding.entryToObject(key); + String exchangeName = bindingRecord.getExchangeName() == null ? ExchangeDefaults.DEFAULT_EXCHANGE_NAME + .asString() : bindingRecord.getExchangeName().asString(); + String queueName = bindingRecord.getQueueName().asString(); + String routingKey = bindingRecord.getRoutingKey().asString(); + FieldTable arguments = bindingRecord.getArguments(); + + UUID bindingId = UUIDGenerator.generateUUID(); + UpgradeConfiguredObjectRecord configuredObject = createBindingConfiguredObjectRecord(exchangeName, queueName, + routingKey, arguments, virtualHostName); + storeConfiguredObjectEntry(configuredObjectsDatabase, bindingId, configuredObject, transaction); + } + + }; + new DatabaseTemplate(environment, OLD_QUEUE_BINDINGS_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction) + .run(bindingCursor); + environment.removeDatabase(transaction, OLD_QUEUE_BINDINGS_DB_NAME); + _logger.info(bindingCursor.getRowCount() + " Queue Binding Entries"); + } + } + + private List<String> upgradeExchanges(Environment environment, Transaction transaction, final String virtualHostName) + { + final List<String> exchangeNames = new ArrayList<String>(); + _logger.info("Exchanges"); + if (environment.getDatabaseNames().contains(OLD_EXCHANGE_DB_NAME)) + { + final ExchangeBinding exchangeBinding = new ExchangeBinding(); + CursorOperation exchangeCursor = new CursorOperation() + { + @Override + public void processEntry(Database exchangeDatabase, Database configuredObjectsDatabase, + Transaction transaction, DatabaseEntry key, DatabaseEntry value) + { + ExchangeRecord exchangeRecord = exchangeBinding.entryToObject(value); + String exchangeName = exchangeRecord.getNameShortString().asString(); + if (!DEFAULT_EXCHANGES_SET.contains(exchangeName)) + { + String exchangeType = exchangeRecord.getType().asString(); + boolean autoDelete = exchangeRecord.isAutoDelete(); + + UUID exchangeId = UUIDGenerator.generateUUID(exchangeName, virtualHostName); + + UpgradeConfiguredObjectRecord configuredObject = createExchangeConfiguredObjectRecord(exchangeName, + exchangeType, autoDelete); + storeConfiguredObjectEntry(configuredObjectsDatabase, exchangeId, configuredObject, transaction); + exchangeNames.add(exchangeName); + } + } + }; + new DatabaseTemplate(environment, OLD_EXCHANGE_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction) + .run(exchangeCursor); + environment.removeDatabase(transaction, OLD_EXCHANGE_DB_NAME); + _logger.info(exchangeCursor.getRowCount() + " Exchange Entries"); + } + return exchangeNames; + } + + private List<String> upgradeQueues(Environment environment, Transaction transaction, final String virtualHostName) + { + final List<String> queueNames = new ArrayList<String>(); + _logger.info("Queues"); + if (environment.getDatabaseNames().contains(OLD_QUEUE_DB_NAME)) + { + final UpgradeQueueBinding queueBinding = new UpgradeQueueBinding(); + CursorOperation queueCursor = new CursorOperation() + { + @Override + public void processEntry(Database queueDatabase, Database configuredObjectsDatabase, + Transaction transaction, DatabaseEntry key, DatabaseEntry value) + { + OldQueueRecord queueRecord = queueBinding.entryToObject(value); + String queueName = queueRecord.getNameShortString().asString(); + queueNames.add(queueName); + String owner = queueRecord.getOwner() == null ? null : queueRecord.getOwner().asString(); + boolean exclusive = queueRecord.isExclusive(); + FieldTable arguments = queueRecord.getArguments(); + + UUID queueId = UUIDGenerator.generateUUID(queueName, virtualHostName); + UpgradeConfiguredObjectRecord configuredObject = createQueueConfiguredObjectRecord(queueName, owner, exclusive, + arguments); + storeConfiguredObjectEntry(configuredObjectsDatabase, queueId, configuredObject, transaction); + } + }; + new DatabaseTemplate(environment, OLD_QUEUE_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction).run(queueCursor); + environment.removeDatabase(transaction, OLD_QUEUE_DB_NAME); + _logger.info(queueCursor.getRowCount() + " Queue Entries"); + } + return queueNames; + } + + private void storeConfiguredObjectEntry(Database configuredObjectsDatabase, UUID id, + UpgradeConfiguredObjectRecord configuredObject, Transaction transaction) + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding(); + uuidBinding.objectToEntry(id, key); + ConfiguredObjectBinding configuredBinding = new ConfiguredObjectBinding(); + configuredBinding.objectToEntry(configuredObject, value); + put(configuredObjectsDatabase, transaction, key, value); + } + + private UpgradeConfiguredObjectRecord createQueueConfiguredObjectRecord(String queueName, String owner, boolean exclusive, + FieldTable arguments) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Queue.NAME, queueName); + attributesMap.put(Queue.OWNER, owner); + attributesMap.put(Queue.EXCLUSIVE, exclusive); + if (arguments != null) + { + attributesMap.put("ARGUMENTS", FieldTable.convertToMap(arguments)); + } + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json); + return configuredObject; + } + + private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType, + boolean autoDelete) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Exchange.NAME, exchangeName); + attributesMap.put(Exchange.TYPE, exchangeType); + attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? LifetimePolicy.AUTO_DELETE.name() + : LifetimePolicy.PERMANENT.name()); + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Exchange.class.getName(), json); + return configuredObject; + } + + private UpgradeConfiguredObjectRecord createBindingConfiguredObjectRecord(String exchangeName, String queueName, + String routingKey, FieldTable arguments, String virtualHostName) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Binding.NAME, routingKey); + attributesMap.put(Binding.EXCHANGE, UUIDGenerator.generateUUID(exchangeName, virtualHostName)); + attributesMap.put(Binding.QUEUE, UUIDGenerator.generateUUID(queueName, virtualHostName)); + if (arguments != null) + { + attributesMap.put(Binding.ARGUMENTS, FieldTable.convertToMap(arguments)); + } + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Binding.class.getName(), json); + return configuredObject; + } + + private void put(final Database database, Transaction txn, DatabaseEntry key, DatabaseEntry value) + { + OperationStatus status = database.put(txn, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new RuntimeException("Cannot add record into " + database.getDatabaseName() + ":" + status); + } + } + static final class CompoundKey { public final long _messageId; @@ -367,4 +704,504 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade } } + static class OldQueueRecord extends Object + { + private final AMQShortString _queueName; + private final AMQShortString _owner; + private final FieldTable _arguments; + private boolean _exclusive; + + public OldQueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments) + { + _queueName = queueName; + _owner = owner; + _exclusive = exclusive; + _arguments = arguments; + } + + public AMQShortString getNameShortString() + { + return _queueName; + } + + public AMQShortString getOwner() + { + return _owner; + } + + public boolean isExclusive() + { + return _exclusive; + } + + public void setExclusive(boolean exclusive) + { + _exclusive = exclusive; + } + + public FieldTable getArguments() + { + return _arguments; + } + + } + + static class UpgradeConfiguredObjectRecord + { + private String _attributes; + private String _type; + + public UpgradeConfiguredObjectRecord(String type, String attributes) + { + super(); + _attributes = attributes; + _type = type; + } + + public String getAttributes() + { + return _attributes; + } + + public String getType() + { + return _type; + } + + } + + static class UpgradeQueueBinding extends TupleBinding<OldQueueRecord> + { + public OldQueueRecord entryToObject(TupleInput tupleInput) + { + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); + FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); + boolean exclusive = tupleInput.readBoolean(); + return new OldQueueRecord(name, owner, exclusive, arguments); + } + + public void objectToEntry(OldQueueRecord queue, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); + FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); + tupleOutput.writeBoolean(queue.isExclusive()); + } + } + + static class UpgradeUUIDBinding extends TupleBinding<UUID> + { + public UUID entryToObject(final TupleInput tupleInput) + { + return new UUID(tupleInput.readLong(), tupleInput.readLong()); + } + + public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput) + { + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); + } + } + + static class ConfiguredObjectBinding extends TupleBinding<UpgradeConfiguredObjectRecord> + { + + public UpgradeConfiguredObjectRecord entryToObject(TupleInput tupleInput) + { + String type = tupleInput.readString(); + String json = tupleInput.readString(); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(type, json); + return configuredObject; + } + + public void objectToEntry(UpgradeConfiguredObjectRecord object, TupleOutput tupleOutput) + { + tupleOutput.writeString(object.getType()); + tupleOutput.writeString(object.getAttributes()); + } + + } + + static class ExchangeRecord extends Object + { + private final AMQShortString _exchangeName; + private final AMQShortString _exchangeType; + private final boolean _autoDelete; + + public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete) + { + _exchangeName = exchangeName; + _exchangeType = exchangeType; + _autoDelete = autoDelete; + } + + public AMQShortString getNameShortString() + { + return _exchangeName; + } + + public AMQShortString getType() + { + return _exchangeType; + } + + public boolean isAutoDelete() + { + return _autoDelete; + } + + } + + static class ExchangeBinding extends TupleBinding<ExchangeRecord> + { + + public ExchangeRecord entryToObject(TupleInput tupleInput) + { + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput); + + boolean autoDelete = tupleInput.readBoolean(); + + return new ExchangeRecord(name, typeName, autoDelete); + } + + public void objectToEntry(ExchangeRecord exchange, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput); + + tupleOutput.writeBoolean(exchange.isAutoDelete()); + } + } + + static class BindingRecord extends Object + { + private final AMQShortString _exchangeName; + private final AMQShortString _queueName; + private final AMQShortString _routingKey; + private final FieldTable _arguments; + + public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, + FieldTable arguments) + { + _exchangeName = exchangeName; + _queueName = queueName; + _routingKey = routingKey; + _arguments = arguments; + } + + public AMQShortString getExchangeName() + { + return _exchangeName; + } + + public AMQShortString getQueueName() + { + return _queueName; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + public FieldTable getArguments() + { + return _arguments; + } + + } + + static class QueueBindingBinding extends TupleBinding<BindingRecord> + { + + public BindingRecord entryToObject(TupleInput tupleInput) + { + AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); + + FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); + + return new BindingRecord(exchangeName, queueName, routingKey, arguments); + } + + public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput); + AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput); + AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput); + + FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput); + } + } + + static class OldQueueEntryKey + { + private AMQShortString _queueName; + private long _messageId; + + public OldQueueEntryKey(AMQShortString queueName, long messageId) + { + _queueName = queueName; + _messageId = messageId; + } + + public AMQShortString getQueueName() + { + return _queueName; + } + + public long getMessageId() + { + return _messageId; + } + } + + static class OldQueueEntryBinding extends TupleBinding<OldQueueEntryKey> + { + + public OldQueueEntryKey entryToObject(TupleInput tupleInput) + { + AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + long messageId = tupleInput.readLong(); + + return new OldQueueEntryKey(queueName, messageId); + } + + public void objectToEntry(OldQueueEntryKey mk, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(mk.getQueueName(), tupleOutput); + tupleOutput.writeLong(mk.getMessageId()); + } + } + + static class NewQueueEntryKey + { + private UUID _queueId; + private long _messageId; + + public NewQueueEntryKey(UUID queueId, long messageId) + { + _queueId = queueId; + _messageId = messageId; + } + + public UUID getQueueId() + { + return _queueId; + } + + public long getMessageId() + { + return _messageId; + } + } + + static class NewQueueEntryBinding extends TupleBinding<NewQueueEntryKey> + { + + public NewQueueEntryKey entryToObject(TupleInput tupleInput) + { + UUID queueId = new UUID(tupleInput.readLong(), tupleInput.readLong()); + long messageId = tupleInput.readLong(); + + return new NewQueueEntryKey(queueId, messageId); + } + + public void objectToEntry(NewQueueEntryKey mk, TupleOutput tupleOutput) + { + UUID uuid = mk.getQueueId(); + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); + tupleOutput.writeLong(mk.getMessageId()); + } + } + + static class NewPreparedTransaction + { + private final NewRecordImpl[] _enqueues; + private final NewRecordImpl[] _dequeues; + + public NewPreparedTransaction(NewRecordImpl[] enqueues, NewRecordImpl[] dequeues) + { + _enqueues = enqueues; + _dequeues = dequeues; + } + + public NewRecordImpl[] getEnqueues() + { + return _enqueues; + } + + public NewRecordImpl[] getDequeues() + { + return _dequeues; + } + } + + static class NewRecordImpl + { + + private long _messageNumber; + private UUID _queueId; + + public NewRecordImpl(UUID queueId, long messageNumber) + { + _messageNumber = messageNumber; + _queueId = queueId; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public UUID getId() + { + return _queueId; + } + } + + static class NewPreparedTransactionBinding extends TupleBinding<NewPreparedTransaction> + { + @Override + public NewPreparedTransaction entryToObject(TupleInput input) + { + NewRecordImpl[] enqueues = readRecords(input); + + NewRecordImpl[] dequeues = readRecords(input); + + return new NewPreparedTransaction(enqueues, dequeues); + } + + private NewRecordImpl[] readRecords(TupleInput input) + { + NewRecordImpl[] records = new NewRecordImpl[input.readInt()]; + for (int i = 0; i < records.length; i++) + { + records[i] = new NewRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); + } + return records; + } + + @Override + public void objectToEntry(NewPreparedTransaction preparedTransaction, TupleOutput output) + { + writeRecords(preparedTransaction.getEnqueues(), output); + writeRecords(preparedTransaction.getDequeues(), output); + } + + private void writeRecords(NewRecordImpl[] records, TupleOutput output) + { + if (records == null) + { + output.writeInt(0); + } + else + { + output.writeInt(records.length); + for (NewRecordImpl record : records) + { + UUID id = record.getId(); + output.writeLong(id.getMostSignificantBits()); + output.writeLong(id.getLeastSignificantBits()); + output.writeLong(record.getMessageNumber()); + } + } + } + } + + static class OldRecordImpl + { + + private long _messageNumber; + private String _queueName; + + public OldRecordImpl(String queueName, long messageNumber) + { + _messageNumber = messageNumber; + _queueName = queueName; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public String getQueueName() + { + return _queueName; + } + } + + static class OldPreparedTransaction + { + private final OldRecordImpl[] _enqueues; + private final OldRecordImpl[] _dequeues; + + public OldPreparedTransaction(OldRecordImpl[] enqueues, OldRecordImpl[] dequeues) + { + _enqueues = enqueues; + _dequeues = dequeues; + } + + public OldRecordImpl[] getEnqueues() + { + return _enqueues; + } + + public OldRecordImpl[] getDequeues() + { + return _dequeues; + } + } + + static class OldPreparedTransactionBinding extends TupleBinding<OldPreparedTransaction> + { + @Override + public OldPreparedTransaction entryToObject(TupleInput input) + { + OldRecordImpl[] enqueues = readRecords(input); + + OldRecordImpl[] dequeues = readRecords(input); + + return new OldPreparedTransaction(enqueues, dequeues); + } + + private OldRecordImpl[] readRecords(TupleInput input) + { + OldRecordImpl[] records = new OldRecordImpl[input.readInt()]; + for (int i = 0; i < records.length; i++) + { + records[i] = new OldRecordImpl(input.readString(), input.readLong()); + } + return records; + } + + @Override + public void objectToEntry(OldPreparedTransaction preparedTransaction, TupleOutput output) + { + writeRecords(preparedTransaction.getEnqueues(), output); + writeRecords(preparedTransaction.getDequeues(), output); + } + + private void writeRecords(OldRecordImpl[] records, TupleOutput output) + { + if (records == null) + { + output.writeInt(0); + } + else + { + output.writeInt(records.length); + for (OldRecordImpl record : records) + { + output.writeString(record.getQueueName()); + output.writeLong(record.getMessageNumber()); + } + } + } + } }
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index 77455c2ea1..e71e39cbb8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -38,13 +38,15 @@ import com.sleepycat.je.OperationStatus; public class Upgrader { - static final String VERSION_DB_NAME = "VERSION"; + static final String VERSION_DB_NAME = "DB_VERSION"; private Environment _environment; + private String _virtualHostName; - public Upgrader(Environment environment) + public Upgrader(Environment environment, String virtualHostName) { _environment = environment; + _virtualHostName = virtualHostName; } public void upgradeIfNecessary() throws AMQStoreException @@ -125,7 +127,7 @@ public class Upgrader + "UpgradeFrom"+fromVersion+"To"+toVersion); Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor(); StoreUpgrade upgrade = ctr.newInstance(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHostName); } catch (ClassNotFoundException e) { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java new file mode 100644 index 0000000000..687c671566 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -0,0 +1,14 @@ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.DurableConfigurationStoreTest; +import org.apache.qpid.server.store.MessageStore; + +public class BDBMessageStoreConfigurationTest extends DurableConfigurationStoreTest +{ + @Override + protected MessageStore createStore() throws Exception + { + return new BDBMessageStore(); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 18e20f2ee8..a318187f13 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.UUID; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -36,6 +37,7 @@ import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -405,13 +407,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); - final AMQShortString mockQueueName = new AMQShortString("queueName"); - + final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { - public String getResourceName() + @Override + public UUID getId() { - return mockQueueName.asString(); + return mockQueueId; } }; @@ -421,7 +423,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto txn.enqueueMessage(mockQueue, new MockMessage(5L)); txn.commitTran(); - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); @@ -443,13 +445,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); - final AMQShortString mockQueueName = new AMQShortString("queueName"); - + final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { - public String getResourceName() + @Override + public UUID getId() { - return mockQueueName.asString(); + return mockQueueId; } }; @@ -463,7 +465,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto txn.enqueueMessage(mockQueue, new MockMessage(23L)); txn.commitTran(); - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); @@ -484,13 +486,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); - final AMQShortString mockQueueName = new AMQShortString("queueName"); - + final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { - public String getResourceName() + @Override + public UUID getId() { - return mockQueueName.asString(); + return mockQueueId; } }; @@ -507,7 +509,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto txn.enqueueMessage(mockQueue, new MockMessage(32L)); txn.commitTran(); - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java new file mode 100644 index 0000000000..f8aeb7f7b0 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuple; + +import junit.framework.TestCase; + +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.ConfiguredObjectRecord; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class ConfiguredObjectBindingTest extends TestCase +{ + + private ConfiguredObjectRecord _object; + + private static final String DUMMY_ATTRIBUTES_STRING = "dummyAttributes"; + private static final String DUMMY_TYPE_STRING = "dummyType"; + private ConfiguredObjectBinding _configuredObjectBinding; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _configuredObjectBinding = ConfiguredObjectBinding.getInstance(); + _object = new ConfiguredObjectRecord(UUIDGenerator.generateUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING); + } + + public void testObjectToEntryAndEntryToObject() + { + TupleOutput tupleOutput = new TupleOutput(); + + _configuredObjectBinding.objectToEntry(_object, tupleOutput); + + byte[] entryAsBytes = tupleOutput.getBufferBytes(); + TupleInput tupleInput = new TupleInput(entryAsBytes); + + ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput); + assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_STRING, storedObject.getAttributes()); + assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType()); + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java index 6df2f8a8db..36991b90d0 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java @@ -22,10 +22,9 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import java.io.File; -import junit.framework.TestCase; - import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.subjects.TestBlankSubject; +import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; import com.sleepycat.je.Database; @@ -33,7 +32,7 @@ import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.Transaction; -public abstract class AbstractUpgradeTestCase extends TestCase +public abstract class AbstractUpgradeTestCase extends QpidTestCase { protected static final class StaticAnswerHandler implements UpgradeInteractionHandler { @@ -57,7 +56,6 @@ public abstract class AbstractUpgradeTestCase extends TestCase public static int[] QUEUE_SIZES = { 1, 1, 10, 3 }; public static int TOTAL_MESSAGE_NUMBER = 15; protected static final LogSubject LOG_SUBJECT = new TestBlankSubject(); - protected static final String TMP_FOLDER = System.getProperty("java.io.tmpdir"); // one binding per exchange protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2; @@ -148,4 +146,8 @@ public abstract class AbstractUpgradeTestCase extends TestCase return count.longValue(); } + public String getVirtualHostName() + { + return getName(); + } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java index c9103b1997..3f9e4e4aa1 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java @@ -66,7 +66,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES)); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName()); assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES))); @@ -93,7 +93,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO)); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName()); assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE))); assertDatabaseRecordCount(DELIVERY_DB_NAME, 12); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index d73a777ca6..5297692820 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -20,14 +20,50 @@ */ package org.apache.qpid.server.store.berkeleydb.upgrade; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CONFIGURED_OBJECTS_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_CONTENT_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_DELIVERY_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_METADATA_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_XID_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_CONTENT_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_XID_DB_NAME; + +import java.io.File; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + import org.apache.log4j.Logger; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.berkeleydb.entry.Xid; +import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.ConfiguredObjectBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransaction; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransactionBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryKey; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewRecordImpl; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransaction; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransactionBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldRecordImpl; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeUUIDBinding; +import org.apache.qpid.server.util.MapJsonSerializer; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Environment; +import com.sleepycat.je.LockMode; import com.sleepycat.je.Transaction; public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase @@ -43,10 +79,13 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase public void testPerformUpgrade() throws Exception { UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName()); assertDatabaseRecordCounts(); assertContent(); + + assertConfiguredObjects(); + assertQueueEntries(); } public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception @@ -54,9 +93,12 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase corruptDatabase(); UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES)); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName()); assertDatabaseRecordCounts(); + + assertConfiguredObjects(); + assertQueueEntries(); } public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception @@ -67,10 +109,117 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO); - upgrade.performUpgrade(_environment, discardMessageInteractionHandler); + upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHostName()); + + assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 11); + assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 11); + + assertConfiguredObjects(); + assertQueueEntries(); + } + + public void testPerformXidUpgrade() throws Exception + { + File storeLocation = new File(TMP_FOLDER, getName()); + storeLocation.mkdirs(); + Environment environment = createEnvironment(storeLocation); + try + { + populateOldXidEntries(environment); + UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); + upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName()); + assertXidEntries(environment); + } + finally + { + try + { + environment.close(); + } + finally + { + deleteDirectoryIfExists(storeLocation); + } + + } + } + + private void assertXidEntries(Environment environment) + { + final DatabaseEntry value = new DatabaseEntry(); + final DatabaseEntry key = getXidKey(); + new DatabaseTemplate(environment, NEW_XID_DB_NAME, null).run(new DatabaseRunnable() + { + + @Override + public void run(Database xidDatabase, Database nullDatabase, Transaction transaction) + { + xidDatabase.get(null, key, value, LockMode.DEFAULT); + } + }); + NewPreparedTransactionBinding newBinding = new NewPreparedTransactionBinding(); + NewPreparedTransaction newTransaction = newBinding.entryToObject(value); + NewRecordImpl[] newEnqueues = newTransaction.getEnqueues(); + NewRecordImpl[] newDequeues = newTransaction.getDequeues(); + assertEquals("Unxpected new enqueus number", 1, newEnqueues.length); + NewRecordImpl enqueue = newEnqueues[0]; + assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST1", getVirtualHostName()), enqueue.getId()); + assertEquals("Unxpected message id", 1, enqueue.getMessageNumber()); + assertEquals("Unxpected new dequeues number", 1, newDequeues.length); + NewRecordImpl dequeue = newDequeues[0]; + assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST2", getVirtualHostName()), dequeue.getId()); + assertEquals("Unxpected message id", 2, dequeue.getMessageNumber()); + } + + private void populateOldXidEntries(Environment environment) + { - assertDatabaseRecordCount("MESSAGE_METADATA", 11); - assertDatabaseRecordCount("MESSAGE_CONTENT", 11); + final DatabaseEntry value = new DatabaseEntry(); + OldRecordImpl[] enqueues = { new OldRecordImpl("TEST1", 1) }; + OldRecordImpl[] dequeues = { new OldRecordImpl("TEST2", 2) }; + OldPreparedTransaction oldPreparedTransaction = new OldPreparedTransaction(enqueues, dequeues); + OldPreparedTransactionBinding oldPreparedTransactionBinding = new OldPreparedTransactionBinding(); + oldPreparedTransactionBinding.objectToEntry(oldPreparedTransaction, value); + + final DatabaseEntry key = getXidKey(); + new DatabaseTemplate(environment, OLD_XID_DB_NAME, null).run(new DatabaseRunnable() + { + + @Override + public void run(Database xidDatabase, Database nullDatabase, Transaction transaction) + { + xidDatabase.put(null, key, value); + } + }); + } + + protected DatabaseEntry getXidKey() + { + final DatabaseEntry value = new DatabaseEntry(); + byte[] globalId = { 1 }; + byte[] branchId = { 2 }; + Xid xid = new Xid(1l, globalId, branchId); + XidBinding xidBinding = XidBinding.getInstance(); + xidBinding.objectToEntry(xid, value); + return value; + } + + private void assertQueueEntries() + { + final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); + final NewQueueEntryBinding newBinding = new NewQueueEntryBinding(); + CursorOperation cursorOperation = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + NewQueueEntryKey newEntryRecord = newBinding.entryToObject(key); + assertTrue("Unexpected queue id", configuredObjects.containsKey(newEntryRecord.getQueueId())); + } + }; + new DatabaseTemplate(_environment, NEW_DELIVERY_DB_NAME, null).run(cursorOperation); } /** @@ -105,19 +254,124 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase }; Transaction transaction = _environment.beginTransaction(null, null); - new DatabaseTemplate(_environment, "messageContentDb_v5", transaction).run(cursorOperation); + new DatabaseTemplate(_environment, OLD_CONTENT_DB_NAME, transaction).run(cursorOperation); transaction.commit(); } private void assertDatabaseRecordCounts() { - assertDatabaseRecordCount("EXCHANGES", 5); - assertDatabaseRecordCount("QUEUES", 3); - assertDatabaseRecordCount("QUEUE_BINDINGS", 6); - assertDatabaseRecordCount("DELIVERIES", 12); + assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 9); + assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 12); - assertDatabaseRecordCount("MESSAGE_METADATA", 12); - assertDatabaseRecordCount("MESSAGE_CONTENT", 12); + assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); + assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12); + } + + private void assertConfiguredObjects() + { + Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); + assertEquals("Unexpected number of configured objects", 9, configuredObjects.size()); + + Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(9); + Map<String, Object> queue1 = new HashMap<String, Object>(); + queue1.put("exclusive", Boolean.FALSE); + queue1.put("name", "myUpgradeQueue"); + queue1.put("owner", null); + expected.add(queue1); + Map<String, Object> queue2 = new HashMap<String, Object>(); + queue2.put("exclusive", Boolean.TRUE); + queue2.put("name", "clientid:mySelectorDurSubName"); + queue2.put("owner", "clientid"); + expected.add(queue2); + Map<String, Object> queue3 = new HashMap<String, Object>(); + queue3.put("exclusive", Boolean.TRUE); + queue3.put("name", "clientid:myDurSubName"); + queue3.put("owner", "clientid"); + expected.add(queue3); + + Map<String, Object> queueBinding1 = new HashMap<String, Object>(); + queueBinding1.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString()); + queueBinding1.put("name", "myUpgradeQueue"); + queueBinding1.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); + expected.add(queueBinding1); + Map<String, Object> queueBinding2 = new HashMap<String, Object>(); + queueBinding2.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString()); + queueBinding2.put("name", "myUpgradeQueue"); + queueBinding2.put("exchange", UUIDGenerator.generateUUID("amq.direct", getVirtualHostName()).toString()); + Map<String, Object> arguments2 = new HashMap<String, Object>(); + arguments2.put("x-filter-jms-selector", ""); + queueBinding2.put("arguments", arguments2); + expected.add(queueBinding2); + Map<String, Object> queueBinding3 = new HashMap<String, Object>(); + queueBinding3.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString()); + queueBinding3.put("name", "myUpgradeTopic"); + queueBinding3.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString()); + Map<String, Object> arguments3 = new HashMap<String, Object>(); + arguments3.put("x-filter-jms-selector", ""); + queueBinding3.put("arguments", arguments3); + expected.add(queueBinding3); + Map<String, Object> queueBinding4 = new HashMap<String, Object>(); + queueBinding4.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString()); + queueBinding4.put("name", "mySelectorUpgradeTopic"); + queueBinding4.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString()); + Map<String, Object> arguments4 = new HashMap<String, Object>(); + arguments4.put("x-filter-jms-selector", "testprop='true'"); + queueBinding4.put("arguments", arguments4); + expected.add(queueBinding4); + Map<String, Object> queueBinding5 = new HashMap<String, Object>(); + queueBinding5.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString()); + queueBinding5.put("name", "clientid:myDurSubName"); + queueBinding5.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); + expected.add(queueBinding5); + Map<String, Object> queueBinding6 = new HashMap<String, Object>(); + queueBinding6.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString()); + queueBinding6.put("name", "clientid:mySelectorDurSubName"); + queueBinding6.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); + expected.add(queueBinding6); + + Set<String> expectedTypes = new HashSet<String>(); + expectedTypes.add(Queue.class.getName()); + expectedTypes.add(Exchange.class.getName()); + expectedTypes.add(Binding.class.getName()); + MapJsonSerializer jsonSerializer = new MapJsonSerializer(); + for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet()) + { + UpgradeConfiguredObjectRecord object = entry.getValue(); + UUID key = entry.getKey(); + Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes()); + assertTrue("Unexpected entry:" + object.getAttributes(), expected.remove(deserialized)); + String type = object.getType(); + assertTrue("Unexpected type:" + type, expectedTypes.contains(type)); + if (type.equals(Exchange.class.getName()) || type.equals(Queue.class.getName())) + { + assertEquals("Unexpected key", key, UUIDGenerator.generateUUID(((String) deserialized.get("name")), getVirtualHostName())); + } + else + { + assertNotNull("Key cannot be null", key); + } + } + assertTrue("Not all expected configured objects found:" + expected, expected.isEmpty()); + } + + private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects() + { + final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>(); + final ConfiguredObjectBinding binding = new ConfiguredObjectBinding(); + final UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding(); + CursorOperation configuredObjectsCursor = new CursorOperation() + { + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + UUID id = uuidBinding.entryToObject(key); + UpgradeConfiguredObjectRecord object = binding.entryToObject(value); + configuredObjectsRecords.put(id, object); + } + }; + new DatabaseTemplate(_environment, CONFIGURED_OBJECTS_DB_NAME, null).run(configuredObjectsCursor); + return configuredObjectsRecords; } private void assertContent() @@ -127,8 +381,8 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase { @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key, - DatabaseEntry value) + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) { long id = LongBinding.entryToLong(key); assertTrue("Unexpected id", id > 0); @@ -136,6 +390,6 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase assertNotNull("Unexpected content", content); } }; - new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation); + new DatabaseTemplate(_environment, NEW_CONTENT_DB_NAME, null).run(contentCursorOperation); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index 99c4b7ab5b..ba5ca842bf 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -50,7 +50,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase public void setUp() throws Exception { super.setUp(); - _upgrader = new Upgrader(_environment); + _upgrader = new Upgrader(_environment, getVirtualHostName()); } private int getStoreVersion() @@ -105,12 +105,12 @@ public class UpgraderTest extends AbstractUpgradeTestCase nonExistentStoreLocation.mkdir(); _environment = createEnvironment(nonExistentStoreLocation); - _upgrader = new Upgrader(_environment); + _upgrader = new Upgrader(_environment, getVirtualHostName()); _upgrader.upgradeIfNecessary(); List<String> databaseNames = _environment.getDatabaseNames(); List<String> expectedDatabases = new ArrayList<String>(); - expectedDatabases.add("VERSION"); + expectedDatabases.add(Upgrader.VERSION_DB_NAME); assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames); assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); diff --git a/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java index 71c8163a50..af56b50437 100644 --- a/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java +++ b/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java @@ -40,6 +40,7 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import java.util.ArrayList; import java.util.Map; +import java.util.UUID; /** * This is a special diagnostic exchange type which doesn't actually do anything @@ -125,14 +126,14 @@ public class DiagnosticExchange extends AbstractExchange return DiagnosticExchange.class; } - public DiagnosticExchange newInstance(VirtualHost host, + public DiagnosticExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { DiagnosticExchange exch = new DiagnosticExchange(); - exch.initialise(host,name,durable,ticket,autoDelete); + exch.initialise(id, host,name,durable,ticket,autoDelete); return exch; } diff --git a/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java b/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java index b4d0d1aa0d..c0f325e42f 100644 --- a/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java +++ b/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java @@ -21,6 +21,8 @@ package org.apache.qpid.extras.exchanges.diagnostic; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.ExchangeType; @@ -42,11 +44,11 @@ public final class DiagnosticExchangeType implements ExchangeType<DiagnosticExch return DiagnosticExchange.class; } - public DiagnosticExchange newInstance(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + public DiagnosticExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { DiagnosticExchange exch = new DiagnosticExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } diff --git a/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java b/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java index cf79924f3d..691f4c8d0b 100644 --- a/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java +++ b/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java @@ -255,7 +255,7 @@ public class TestExchange implements Exchange return 0; } - public void initialise(VirtualHost arg0, AMQShortString arg1, boolean arg2, int arg3, boolean arg4) + public void initialise(UUID id, VirtualHost arg0, AMQShortString arg1, boolean arg2, int arg3, boolean arg4) throws AMQException { } diff --git a/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java b/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java index db02ca13ea..5c454b9d0e 100644 --- a/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java +++ b/qpid/java/broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java @@ -21,6 +21,8 @@ package org.apache.qpid.extras.exchanges.example; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; @@ -40,12 +42,12 @@ public class TestExchangeType implements ExchangeType return null; } - public Exchange newInstance(VirtualHost host, AMQShortString name, boolean durable, + public Exchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int token, boolean autoDelete) throws AMQException { TestExchange ex = new TestExchange(); - ex.initialise(host, name, durable, token, autoDelete); + ex.initialise(id, host, name, durable, token, autoDelete); return ex; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java index 2d6f7e0946..034a4ae53c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.virtualhost.HouseKeepingTask; @@ -83,7 +84,8 @@ public class ManagementExchange implements Exchange, QMFService.Listener private class ManagementQueue implements BaseQueue { - private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + UUID.randomUUID().toString(); + private final UUID QUEUE_ID = UUIDGenerator.generateUUID(); + private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + QUEUE_ID.toString(); private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING); public void enqueue(ServerMessage message) throws AMQException @@ -129,9 +131,10 @@ public class ManagementExchange implements Exchange, QMFService.Listener return NAME_AS_SHORT_STRING; } - public String getResourceName() + @Override + public UUID getId() { - return NAME_AS_STRING; + return QUEUE_ID; } } @@ -155,14 +158,14 @@ public class ManagementExchange implements Exchange, QMFService.Listener return ManagementExchange.class; } - public ManagementExchange newInstance(VirtualHost host, + public ManagementExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { ManagementExchange exch = new ManagementExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } @@ -183,7 +186,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener return QPID_MANAGEMENT_TYPE; } - public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { if(!QPID_MANAGEMENT.equals(name)) @@ -191,7 +194,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener throw new AMQException("Can't create more than one Management exchange"); } _virtualHost = host; - _id = host.getConfigStore().createId(); + _id = id; _virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost)); getConfigStore().addConfiguredObject(this); getQMFService().addListener(this); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 7ef06ce0f8..0f32b98aa8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueueMBean; @@ -48,6 +49,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; /** * This MBean implements the broker management interface and exposes the @@ -171,8 +173,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName)); if (exchange == null) { - exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), - durable, false, 0); + exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), + new AMQShortString(type), durable, false, 0); _exchangeRegistry.registerExchange(exchange); if (durable) { @@ -244,45 +246,42 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException { final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName); - AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString); - if (queue != null) + synchronized (_queueRegistry) { - throw new JMException("The queue \"" + queueName + "\" already exists."); - } - - CurrentActor.set(new ManagementActor(getLogActor().getRootMessageLogger())); - try - { - AMQShortString ownerShortString = null; - if (owner != null) + AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString); + if (queue != null) { - ownerShortString = new AMQShortString(owner); + throw new JMException("The queue \"" + queueName + "\" already exists."); } - FieldTable args = null; - if(arguments != null) + CurrentActor.set(new ManagementActor(getLogActor().getRootMessageLogger())); + try { - args = FieldTable.convertToFieldTable(arguments); - } - final VirtualHost virtualHost = getVirtualHost(); + FieldTable args = null; + if(arguments != null) + { + args = FieldTable.convertToFieldTable(arguments); + } + final VirtualHost virtualHost = getVirtualHost(); + + queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, durable, owner, + false, false, getVirtualHost(), arguments); + if (queue.isDurable() && !queue.isAutoDelete()) + { + getVirtualHost().getMessageStore().createQueue(queue, args); + } - queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString, - false, false, getVirtualHost(), args); - if (queue.isDurable() && !queue.isAutoDelete()) + virtualHost.getBindingFactory().addBinding(queueName, queue, _exchangeRegistry.getDefaultExchange(), null); + } + catch (AMQException ex) { - getVirtualHost().getMessageStore().createQueue(queue, args); + JMException jme = new JMException(ex.toString()); + throw new MBeanException(jme, "Error in creating queue " + queueName); + } + finally + { + CurrentActor.remove(); } - - virtualHost.getBindingFactory().addBinding(queueName, queue, _exchangeRegistry.getDefaultExchange(), null); - } - catch (AMQException ex) - { - JMException jme = new JMException(ex.toString()); - throw new MBeanException(jme, "Error in creating queue " + queueName); - } - finally - { - CurrentActor.remove(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java index 8e44da095a..2efd4cee26 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -117,7 +117,7 @@ public class Binding public String toString() { - return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+"}"; + return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + _id + " }"; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java index 2460be4705..abf252c733 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -24,7 +24,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.BindingConfig; import org.apache.qpid.server.configuration.BindingConfigType; import org.apache.qpid.server.configuration.ConfigStore; @@ -33,11 +32,13 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collections; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class BindingFactory @@ -57,9 +58,9 @@ public class BindingFactory //TODO : persist creation time private long _createTime = System.currentTimeMillis(); - private BindingImpl(String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) + private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) { - super(queue.getVirtualHost().getConfigStore().createId(), bindingKey, queue, exchange, arguments); + super(id, bindingKey, queue, exchange, arguments); _logSubject = new BindingLogSubject(bindingKey,exchange,queue); } @@ -116,19 +117,19 @@ public class BindingFactory public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { - return makeBinding(bindingKey, queue, exchange, arguments, false, false); + return makeBinding(null, bindingKey, queue, exchange, arguments, false, false); } - public boolean replaceBinding(final String bindingKey, + public boolean replaceBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { - return makeBinding(bindingKey, queue, exchange, arguments, false, true); + return makeBinding(id, bindingKey, queue, exchange, arguments, false, true); } - private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException + private boolean makeBinding(UUID id, String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException { assert queue != null; final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); @@ -163,9 +164,12 @@ public class BindingFactory } } - - BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments); - BindingImpl existingMapping = _bindings.putIfAbsent(b,b); + if (id == null) + { + id = UUIDGenerator.generateUUID(); + } + BindingImpl b = new BindingImpl(id, bindingKey, queue, exchange, arguments); + BindingImpl existingMapping = _bindings.putIfAbsent(b, b); if (existingMapping == null || force) { if (existingMapping != null) @@ -175,7 +179,7 @@ public class BindingFactory if (b.isDurable() && !restore) { - _virtualHost.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); + _virtualHost.getMessageStore().bindQueue(b); } queue.addQueueDeleteTask(b); @@ -198,9 +202,9 @@ public class BindingFactory return _virtualHost.getConfigStore(); } - public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException + public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException { - makeBinding(bindingKey,queue,exchange,argumentMap,true, false); + makeBinding(id, bindingKey,queue,exchange,argumentMap,true, false); } public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException @@ -239,7 +243,7 @@ public class BindingFactory } } - BindingImpl b = _bindings.remove(new BindingImpl(bindingKey,queue,exchange,arguments)); + BindingImpl b = _bindings.remove(new BindingImpl(null, bindingKey,queue,exchange,arguments)); if (b != null) { @@ -250,10 +254,7 @@ public class BindingFactory if (b.isDurable()) { - _virtualHost.getMessageStore().unbindQueue(exchange, - new AMQShortString(bindingKey), - queue, - FieldTable.convertToFieldTable(arguments)); + _virtualHost.getMessageStore().unbindQueue(b); } b.logDestruction(); getConfigStore().removeConfiguredObject(b); @@ -280,7 +281,7 @@ public class BindingFactory arguments = Collections.emptyMap(); } - BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments); + BindingImpl b = new BindingImpl(null, bindingKey,queue,exchange,arguments); return _bindings.get(b); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index af49168a80..9493f400f2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -114,7 +114,7 @@ public abstract class AbstractExchange implements Exchange, Managable */ protected abstract AbstractExchangeMBean createMBean() throws JMException; - public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { _virtualHost = host; @@ -123,7 +123,7 @@ public abstract class AbstractExchange implements Exchange, Managable _autoDelete = autoDelete; _ticket = ticket; - _id = getConfigStore().createId(); + _id = id; getConfigStore().addConfiguredObject(this); createAndRegisterMBean(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 153419de1b..5058f91995 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -25,9 +25,11 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.qmf.ManagementExchange; import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -35,6 +37,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.UUID; public class DefaultExchangeFactory implements ExchangeFactory { @@ -76,17 +79,29 @@ public class DefaultExchangeFactory implements ExchangeFactory return publicTypes; } - - public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) - throws AMQException + throws AMQException { return createExchange(new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0); } - public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, - int ticket) + public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) + throws AMQException + { + return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0); + } + + public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, + boolean autoDelete, int ticket) + throws AMQException + { + UUID id = UUIDGenerator.generateExchangeUUID(exchange.asString(), _host.getName()); + return createExchange(id, exchange, type, durable, autoDelete, ticket); + } + + public Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable, + boolean autoDelete, int ticket) throws AMQException { // Check access @@ -102,7 +117,7 @@ public class DefaultExchangeFactory implements ExchangeFactory throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null); } - Exchange e = exchType.newInstance(_host, exchange, durable, ticket, autoDelete); + Exchange e = exchType.newInstance(id, _host, exchange, durable, ticket, autoDelete); return e; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 33e73b4668..bf4184bf0b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -174,4 +175,25 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _exchangeMapStr.clear(); } + @Override + public synchronized Exchange getExchange(UUID exchangeId) + { + if (exchangeId == null) + { + return getDefaultExchange(); + } + else + { + Collection<Exchange> exchanges = _exchangeMap.values(); + for (Exchange exchange : exchanges) + { + if (exchange.getId().equals(exchangeId)) + { + return exchange; + } + } + return null; + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 9525324f57..af9322764a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -36,6 +36,7 @@ import javax.management.JMException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -103,14 +104,14 @@ public class DirectExchange extends AbstractExchange return DirectExchange.class; } - public DirectExchange newInstance(VirtualHost host, + public DirectExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { DirectExchange exch = new DirectExchange(); - exch.initialise(host,name,durable,ticket,autoDelete); + exch.initialise(id, host,name,durable,ticket,autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 0bcfc3a3da..289cb1a923 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -36,6 +36,7 @@ import javax.management.JMException; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.UUID; public interface Exchange extends ExchangeReferrer, ExchangeConfig { @@ -50,7 +51,7 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig AMQShortString getTypeShortString(); - void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException, JMException; boolean isDurable(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index 577da79028..aae4ae89bb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import java.util.Collection; +import java.util.UUID; public interface ExchangeFactory @@ -40,4 +41,10 @@ public interface ExchangeFactory Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes(); Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; + + Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; + + Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable, + boolean autoDelete, int ticket) + throws AMQException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java index 335efaeaa2..ba4f57a8e0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.exchange; + import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index db244c114b..795ae2e140 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import java.util.Collection; +import java.util.UUID; public interface ExchangeRegistry @@ -54,4 +55,6 @@ public interface ExchangeRegistry void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException; void clearAndUnregisterMbeans(); + + Exchange getExchange(UUID exchangeId); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java index ce339c4e29..a01e41f039 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.exchange; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -29,7 +31,7 @@ public interface ExchangeType<T extends Exchange> { public AMQShortString getName(); public Class<T> getExchangeClass(); - public T newInstance(VirtualHost host, AMQShortString name, + public T newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; public AMQShortString getDefaultExchangeName(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index f9ad2fad87..5ebcfd095f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.util.ArrayList; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class FanoutExchange extends AbstractExchange @@ -65,14 +66,14 @@ public class FanoutExchange extends AbstractExchange return FanoutExchange.class; } - public FanoutExchange newInstance(VirtualHost host, + public FanoutExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { FanoutExchange exch = new FanoutExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 2700a7cda3..16ba3c0431 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -37,6 +37,7 @@ import javax.management.JMException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -93,12 +94,12 @@ public class HeadersExchange extends AbstractExchange return HeadersExchange.class; } - public HeadersExchange newInstance(VirtualHost host, AMQShortString name, boolean durable, int ticket, + public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { HeadersExchange exch = new HeadersExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 7ce84b7a89..7ea7a41826 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.UUID; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import javax.management.JMException; @@ -70,14 +71,14 @@ public class TopicExchange extends AbstractExchange return TopicExchange.class; } - public TopicExchange newInstance(VirtualHost host, + public TopicExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { TopicExchange exch = new TopicExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java index 4b4bdd4efb..c7046f8e53 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java @@ -767,13 +767,13 @@ public class Bridge implements BridgeConfig try { - _queue = AMQQueueFactory.createAMQQueueImpl(_tmpQueueName, + _queue = AMQQueueFactory.createAMQQueueImpl(null, + _tmpQueueName, isDurable(), _link.getFederationTag(), false, false, - getVirtualHost(), - options); + getVirtualHost(), options); } catch (AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index bb979d5441..49ca934966 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -134,7 +134,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> Map<String, Object> oldArgs = oldBinding.getArguments(); if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments))) { - virtualHost.getBindingFactory().replaceBinding(bindingKey, queue, exch, arguments); + virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, queue, exch, arguments); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 7d993ae14d..396829df91 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; @@ -31,6 +32,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -43,6 +45,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collections; +import java.util.Map; import java.util.UUID; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> @@ -219,10 +222,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar throws AMQException { final QueueRegistry registry = virtualHost.getQueueRegistry(); - AMQShortString owner = body.getExclusive() ? session.getContextKey() : null; + String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null; - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), - body.getExclusive(),virtualHost, body.getArguments()); + Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments()); + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), AMQShortString.toString(queueName), body.getDurable(), owner, body.getAutoDelete(), + body.getExclusive(),virtualHost, arguments); if (body.getExclusive() && !body.getDurable()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java new file mode 100644 index 0000000000..fdb009386c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +public interface Binding extends ConfiguredObject +{ + + public String MATCHED_BYTES = "matchedBytes"; + public String MATCHED_MESSAGES = "matchedMessages"; + public String STATE_CHANGED = "stateChanged"; + + public static final Collection<String> AVAILABLE_STATISTICS = + Collections.unmodifiableCollection( + Arrays.asList( + MATCHED_BYTES, + MATCHED_MESSAGES, + STATE_CHANGED)); + + + public String ARGUMENTS = "arguments"; + public String CREATED = "created"; + public String DURABLE = "durable"; + public String ID = "id"; + public String LIFETIME_POLICY = "lifetimePolicy"; + public String NAME = "name"; + public String STATE = "state"; + public String TIME_TO_LIVE = "timeToLive"; + public String UPDATED = "updated"; + public String QUEUE = "queue"; + public String EXCHANGE = "exchange"; + + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableCollection( + Arrays.asList(ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + EXCHANGE, + QUEUE, + ARGUMENTS) + ); + + + + Map<String,Object> getArguments(); + + void delete(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java new file mode 100644 index 0000000000..6477633a9b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public interface ConfigurationChangeListener +{ + /** + * Inform the listener that the passed object has changed state + * + * @param object the object whose state has changed + * @param oldState the state prior to the change + * @param newState the state after the change + */ + void stateChanged(ConfiguredObject object, State oldState, State newState); + + void childAdded(ConfiguredObject object, ConfiguredObject child); + + void childRemoved(ConfiguredObject object, ConfiguredObject child); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java new file mode 100644 index 0000000000..fb47a54d0a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -0,0 +1,229 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.security.AccessControlException; +import java.util.Collection; +import java.util.UUID; + +public interface ConfiguredObject +{ + + /** + * Get the universally unique identifier for the object + * + * @return the objects id + */ + UUID getId(); + + /** + * Get the name of the object + * + * @return the name of the object + */ + String getName(); + + + /** + * Attempt to change the name of the object + * + * Request a change to the name of the object. The caller must pass in the name it believes the object currently + * has. If the current name differes from this expected value, then no name change will occur + * + * @param currentName the name the caller believes the object to have + * @param desiredName the name the caller would like the object to have + * @return the new name for the object + * @throws IllegalStateException if the name of the object may not be changed in in the current state + * @throws AccessControlException if the current context does not have permission to change the name + * @throws IllegalArgumentException if the provided name is not legal + * @throws NullPointerException if the desired name is null + */ + String setName(String currentName, String desiredName) throws IllegalStateException, + AccessControlException; + + + /** + * Get the desired state of the object. + * + * This is the state set at the object itself, however the object + * may not be able attain this state if one of its ancestors is in a different state (in particular a descendant + * object may not be ACTIVE if all of its ancestors are not also ACTIVE). + * + * @return the desired state of the object + */ + State getDesiredState(); + + /** + * Change the desired state of the object + * + * Request a change to the current state. The caller must pass in the state it believe the object to be in, if + * this differs from the current desired state when the object evalues the request, then no state change will occur. + * + * @param currentState the state the caller believes the object to be in + * @param desiredState the state the caller wishes the object to attain + * @return the new current state + * @throws IllegalStateTransitionException the requested state tranisition is invalid + * @throws AccessControlException the current context does not have sufficeint permissions to change the state + */ + State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, + AccessControlException; + + /** + * Get the actual state of the object. + * + * This state is derived fromt the desired state of the object itself and + * the actual state of its parents. If an object "desires" to be ACTIVE, but one of its parents is STOPPED, then + * the actual state of the object will be STOPPED + * + * @return the actual state of the object + */ + State getActualState(); + + + /** + * Add a listener which will be informed of all changes to this configuration object + * + * @param listener the listener to add + */ + void addChangeListener(ConfigurationChangeListener listener); + + /** + * Remove a change listener + * + * + * @param listener the listener to remove + * @return true iff a listener was removed + */ + boolean removeChangeListener(ConfigurationChangeListener listener); + + /** + * Get the parent of the given type for this object + * + * @param clazz the class of parent being asked for + * @return the objects parent + */ + <T extends ConfiguredObject> T getParent(Class<T> clazz); + + + /** + * Returns whether the the object configuration is durably stored + * + * @return the durablity + */ + boolean isDurable(); + + /** + * Sets the durability of the object + * + * @param durable true iff the caller wishes the object to store its configuration durably + * + * @throws IllegalStateException if the durability cannot be changed in the current state + * @throws AccessControlException if the current context does not have sufficient permission to change the durability + * @throws IllegalArgumentException if the object does not support the requested durability + */ + void setDurable(boolean durable) throws IllegalStateException, + AccessControlException, + IllegalArgumentException; + + /** + * Return the lifetime policy for the object + * + * @return the lifetime policy + */ + LifetimePolicy getLifetimePolicy(); + + /** + * Set the lifetime policy of the object + * + * @param expected The lifetime policy the caller believes the object currently has + * @param desired The lifetime policy the caller desires the object to have + * @return the new lifetime policy + * @throws IllegalStateException if the lifetime policy cannot be changed in the current state + * @throws AccessControlException if the caller does not have permission to change the lifetime policy + * @throws IllegalArgumentException if the object does not support the requested lifetime policy + */ + LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException, + AccessControlException, + IllegalArgumentException; + + /** + * Get the time the object will live once the lifetime policy conditions are no longer fulfilled + * + * @return the time to live + */ + long getTimeToLive(); + + /** + * Set the ttl value + * + * @param expected the ttl the caller believes the object currently has + * @param desired the ttl value the caller + * @return the new ttl value + * @throws IllegalStateException if the ttl cannot be set in the current state + * @throws AccessControlException if the caller does not have permission to change the ttl + * @throws IllegalArgumentException if the object does not support the requested ttl value + */ + long setTimeToLive(long expected, long desired) throws IllegalStateException, + AccessControlException, + IllegalArgumentException; + + /** + * Get the names of attributes that are set on this object + * + * Not that the returned collection is correct at the time the method is called, but will not reflect future + * additions or removals when they occur + * + * @return the collection of attribute names + */ + Collection<String> getAttributeNames(); + + + /** + * Return the value for the given attribute + * + * @param name the name of the attribute + * @return the value of the attribute at the object (or null if the attribute is not set + */ + Object getAttribute(String name); + + /** + * Set the value of an attribute + * + * @param name the name of the attribute to be set + * @param expected the value the caller believes the attribute currently has (or null if it is expected to be unset) + * @param desired the desired value for the attribute (or null to unset the attribute) + * @return the new value for the given attribute + * @throws IllegalStateException if the attribute cannot be set while the object is in its current state + * @throws AccessControlException if the caller does not have permission to alter the value of the attribute + * @throws IllegalArgumentException if the provided value is not valid for the given argument + */ + Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, + AccessControlException, + IllegalArgumentException; + + + /** + * Return the Statistics holder for the ConfiguredObject + * + * @return the Statistics holder for the ConfiguredObject (or null if none exists) + */ + Statistics getStatistics(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java new file mode 100644 index 0000000000..958177e713 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public interface Consumer extends ConfiguredObject +{ + public String DISTRIBUTION_MODE = "distributionMode"; + public String EXCLUSIVE = "exclusive"; + public String NO_LOCAL = "noLocal"; + public String SELECTOR = "selector"; + public String SETTLEMENT_MODE = "settlementMode"; + public String CREATED = "created"; + public String DURABLE = "durable"; + public String ID = "id"; + public String LIFETIME_POLICY = "lifetimePolicy"; + public String NAME = "name"; + public String STATE = "state"; + public String TIME_TO_LIVE = "timeToLive"; + public String UPDATED = "updated"; + + public Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableCollection( + Arrays.asList(ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + DISTRIBUTION_MODE, + SETTLEMENT_MODE, + EXCLUSIVE, + NO_LOCAL, + SELECTOR)); + + public String BYTES_OUT = "bytesOut"; + public String MESSAGES_OUT = "messagesOut"; + public String STATE_CHANGED = "stateChanged"; + public String UNACKNOWLEDGED_BYTES = "unacknowledgedBytes"; + public String UNACKNOWLEDGED_MESSAGES = "unacknowledgedMessages"; + + public Collection<String> AVAILABLE_STATISTICS = + Collections.unmodifiableCollection( + Arrays.asList(BYTES_OUT, + MESSAGES_OUT, + STATE_CHANGED, + UNACKNOWLEDGED_BYTES, + UNACKNOWLEDGED_MESSAGES) + ); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java new file mode 100644 index 0000000000..e872273d05 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +public interface Exchange extends ConfiguredObject +{ + String BINDING_COUNT = "bindingCount"; + String BYTES_DROPPED = "bytesDropped"; + String BYTES_IN = "bytesIn"; + String MESSAGES_DROPPED = "messagesDropped"; + String MESSAGES_IN = "messagesIn"; + String PRODUCER_COUNT = "producerCount"; + String STATE_CHANGED = "stateChanged"; + + public static final Collection<String> AVAILABLE_STATISTICS = + Collections.unmodifiableList( + Arrays.asList(BINDING_COUNT, + BYTES_DROPPED, + BYTES_IN, + MESSAGES_DROPPED, + MESSAGES_IN, + PRODUCER_COUNT, + STATE_CHANGED)); + + String CREATED = "created"; + String DURABLE = "durable"; + String ID = "id"; + String LIFETIME_POLICY = "lifetimePolicy"; + String NAME = "name"; + String STATE = "state"; + String TIME_TO_LIVE = "timeToLive"; + String UPDATED = "updated"; + String ALTERNATE_EXCHANGE = "alternateExchange"; + String TYPE = "type"; + + // Attributes + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList( + ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + ALTERNATE_EXCHANGE, + TYPE + )); + + String getExchangeType(); + + //children + Collection<Binding> getBindings(); + Collection<Publisher> getPublishers(); + + //operations + Binding createBinding(String bindingKey, + Queue queue, + Map<String,Object> bindingArguments, + Map<String, Object> attributes); + + + // Statistics + + void delete(); +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java index 180893178d..9cab5e2103 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java @@ -18,36 +18,26 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb.entry; +package org.apache.qpid.server.model; -import org.apache.qpid.framing.AMQShortString; - -public class ExchangeRecord extends Object +public class IllegalStateTransitionException extends RuntimeException { - private final AMQShortString _exchangeName; - private final AMQShortString _exchangeType; - private final boolean _autoDelete; - - public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete) + public IllegalStateTransitionException() { - _exchangeName = exchangeName; - _exchangeType = exchangeType; - _autoDelete = autoDelete; } - public AMQShortString getNameShortString() + public IllegalStateTransitionException(final String message) { - return _exchangeName; + super(message); } - public AMQShortString getType() + public IllegalStateTransitionException(final String message, final Throwable cause) { - return _exchangeType; + super(message, cause); } - public boolean isAutoDelete() + public IllegalStateTransitionException(final Throwable cause) { - return _autoDelete; + super(cause); } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java new file mode 100644 index 0000000000..c9006f4e71 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public enum LifetimePolicy +{ + PERMANENT, + AUTO_DELETE +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java new file mode 100644 index 0000000000..cdb85d8023 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public interface Publisher extends ConfiguredObject +{ +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java new file mode 100644 index 0000000000..7c4f0de22b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import org.apache.qpid.server.queue.QueueEntryVisitor; + + +public interface Queue extends ConfiguredObject +{ + public static final String BINDING_COUNT = "bindingCount"; + public static final String CONSUMER_COUNT = "consumerCount"; + public static final String CONSUMER_COUNT_WITH_CREDIT = "consumerCountWithCredit"; + public static final String DISCARDS_TTL_BYTES = "discardsTtlBytes"; + public static final String DISCARDS_TTL_MESSAGES = "discardsTtlMessages"; + public static final String PERSISTENT_DEQUEUED_BYTES = "persistentDequeuedBytes"; + public static final String PERSISTENT_DEQUEUED_MESSAGES = "persistentDequeuedMessages"; + public static final String PERSISTENT_ENQUEUED_BYTES = "persistentEnqueuedBytes"; + public static final String PERSISTENT_ENQUEUED_MESSAGES = "persistentEnqueuedMessages"; + public static final String QUEUE_DEPTH_BYTES = "queueDepthBytes"; + public static final String QUEUE_DEPTH_MESSAGES = "queueDepthMessages"; + public static final String STATE_CHANGED = "stateChanged"; + public static final String TOTAL_DEQUEUED_BYTES = "totalDequeuedBytes"; + public static final String TOTAL_DEQUEUED_MESSAGES = "totalDequeuedMessages"; + public static final String TOTAL_ENQUEUED_BYTES = "totalEnqueuedBytes"; + public static final String TOTAL_ENQUEUED_MESSAGES = "totalEnqueuedMessages"; + public static final String UNACKNOWLEDGED_BYTES = "unacknowledgedBytes"; + public static final String UNACKNOWLEDGED_MESSAGES = "unacknowledgedMessages"; + + public static final Collection<String> AVAILABLE_STATISTICS = + Collections.unmodifiableList( + Arrays.asList(BINDING_COUNT, + CONSUMER_COUNT, + CONSUMER_COUNT_WITH_CREDIT, + DISCARDS_TTL_BYTES, + DISCARDS_TTL_MESSAGES, + PERSISTENT_DEQUEUED_BYTES, + PERSISTENT_DEQUEUED_MESSAGES, + PERSISTENT_ENQUEUED_BYTES, + PERSISTENT_ENQUEUED_MESSAGES, + QUEUE_DEPTH_BYTES, + QUEUE_DEPTH_MESSAGES, + STATE_CHANGED, + TOTAL_DEQUEUED_BYTES, + TOTAL_DEQUEUED_MESSAGES, + TOTAL_ENQUEUED_BYTES, + TOTAL_ENQUEUED_MESSAGES, + UNACKNOWLEDGED_BYTES, + UNACKNOWLEDGED_MESSAGES)); + + + + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String DURABLE = "durable"; + public static final String LIFETIME_POLICY = "lifetimePolicy"; + public static final String TIME_TO_LIVE = "timeToLive"; + public static final String CREATED = "created"; + public static final String UPDATED = "updated"; + + public static final String ALERT_REPEAT_GAP = "alertRepeatGap"; + public static final String ALERT_THRESHOLD_MESSAGE_AGE = "alertThresholdMessageAge"; + public static final String ALERT_THRESHOLD_MESSAGE_SIZE = "alertThresholdMessageSize"; + public static final String ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "alertThresholdQueueDepthBytes"; + public static final String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages"; + public static final String ALTERNATE_EXCHANGE = "alternateExchange"; + public static final String EXCLUSIVE = "exclusive"; + public static final String MESSAGE_GROUP_KEY = "messageGroupKey"; + public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; + public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; + public static final String LVQ_KEY = "lvqKey"; + public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; + public static final String NO_LOCAL = "noLocal"; + public static final String OWNER = "owner"; + public static final String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes"; + public static final String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes"; + public static final String QUEUE_FLOW_STOPPED = "queueFlowStopped"; + public static final String SORT_KEY = "sortKey"; + public static final String TYPE = "type"; + + + + + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList(ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + TYPE, + ALTERNATE_EXCHANGE, + EXCLUSIVE, + OWNER, + NO_LOCAL, + LVQ_KEY, + SORT_KEY, + MESSAGE_GROUP_KEY, + MESSAGE_GROUP_DEFAULT_GROUP, + MESSAGE_GROUP_SHARED_GROUPS, + MAXIMUM_DELIVERY_ATTEMPTS, + QUEUE_FLOW_CONTROL_SIZE_BYTES, + QUEUE_FLOW_RESUME_SIZE_BYTES, + QUEUE_FLOW_STOPPED, + ALERT_THRESHOLD_MESSAGE_AGE, + ALERT_THRESHOLD_MESSAGE_SIZE, + ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, + ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + ALERT_REPEAT_GAP + )); + + //children + Collection<Binding> getBindings(); + Collection<Consumer> getConsumers(); + + + //operations + + void visit(QueueEntryVisitor visitor); + + void delete(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java new file mode 100644 index 0000000000..a73b2c9d3e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public enum State +{ + INITIALISING, + QUIESCED, + STOPPED, + ACTIVE, + DELETED +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java new file mode 100644 index 0000000000..2cb81eae82 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java @@ -0,0 +1,25 @@ +package org.apache.qpid.server.model; + +import java.util.Collection; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface Statistics +{ + Collection<String> getStatisticNames(); + public Object getStatistic(String name); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java new file mode 100644 index 0000000000..d8493c6df4 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.UUID; + +import org.apache.qpid.exchange.ExchangeDefaults; + + +public class UUIDGenerator +{ + + public static UUID generateUUID() + { + return UUID.randomUUID(); + } + + public static UUID generateUUID(String objectName, String virtualHostName) + { + StringBuilder sb = new StringBuilder(); + sb.append(virtualHostName).append(objectName); + return UUID.nameUUIDFromBytes(sb.toString().getBytes()); + } + + public static UUID generateExchangeUUID(String echangeName, String virtualHostName) + { + if(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(echangeName) || echangeName.startsWith("amq.") || echangeName.startsWith("qpid.")) + { + return generateUUID(echangeName, virtualHostName); + } + else + { + return generateUUID(); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 263ed6d8cc..8d227d9677 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -62,6 +62,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.filter.SimpleFilterManager; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; @@ -199,6 +200,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { queue = AMQQueueFactory.createAMQQueueImpl( + UUIDGenerator.generateUUID(), name, isDurable, null, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 3f7eb18989..ef298b4731 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -37,6 +37,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -314,7 +315,8 @@ public class Session_1_0 implements SessionEventListener ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); - final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl(queueName, + final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateUUID(), + queueName, false, // durable null, // owner false, // autodelete diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index f6bf6626a0..46c2a635b7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -23,19 +23,20 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; +import java.util.UUID; public class AMQPriorityQueue extends OutOfOrderQueue { - protected AMQPriorityQueue(final String name, + protected AMQPriorityQueue(UUID id, + final String name, final boolean durable, final String owner, final boolean autoDelete, boolean exclusive, final VirtualHost virtualHost, - Map<String, Object> arguments, - int priorities) + Map<String, Object> arguments, int priorities) { - super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); + super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); } public int getPriorities() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 8ff3f0148b..f2b7d7c56b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.queue; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; @@ -30,12 +34,10 @@ import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.HashMap; -import java.util.Map; - public class AMQQueueFactory { public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; @@ -166,8 +168,13 @@ public class AMQQueueFactory } }; - - /** @see #createAMQQueueImpl(String, boolean, String, boolean, boolean, VirtualHost, Map) */ + /** + * Creates a new queue with a random id. + * + * @see #createAMQQueueImpl(UUID, String, boolean, String, boolean, boolean, VirtualHost, Map) + * @deprecated because only called from unit tests + * */ + @Deprecated public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, @@ -175,22 +182,28 @@ public class AMQQueueFactory boolean exclusive, VirtualHost virtualHost, final FieldTable arguments) throws AMQException { - return createAMQQueueImpl(name == null ? null : name.toString(), + return createAMQQueueImpl(UUIDGenerator.generateUUID(), + name == null ? null : name.toString(), durable, owner == null ? null : owner.toString(), autoDelete, - exclusive, - virtualHost, FieldTable.convertToMap(arguments)); + exclusive, virtualHost, FieldTable.convertToMap(arguments)); } - - public static AMQQueue createAMQQueueImpl(String queueName, + /** + * @param id the id to use. If default then one is generated from queueName. TODO check correctness of calls that pass a null value. + */ + public static AMQQueue createAMQQueueImpl(UUID id, + String queueName, boolean durable, String owner, boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException + boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException { + if (id == null) + { + throw new IllegalArgumentException("Queue id must not be null"); + } if (queueName == null) { throw new IllegalArgumentException("Queue name must not be null"); @@ -241,19 +254,19 @@ public class AMQQueueFactory AMQQueue q; if(sortingKey != null) { - q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); + q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); } else if(conflationKey != null) { - q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); + q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); } else if(priorities > 1) { - q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); + q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); } else { - q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments); + q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments); } //Register the new queue @@ -287,7 +300,7 @@ public class AMQQueueFactory if(dlExchange == null) { - dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); + dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); exchangeRegistry.registerExchange(dlExchange); @@ -309,7 +322,7 @@ public class AMQQueueFactory args.put(X_QPID_DLQ_ENABLED, false); args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0); - dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args); + dlQueue = createAMQQueueImpl(UUIDGenerator.generateUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args); //enter the dlq in the persistent store virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); @@ -364,7 +377,10 @@ public class AMQQueueFactory arguments.put(X_QPID_DLQ_ENABLED, true); } - AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); + // we need queues that are defined in config to have deterministic ids. + UUID id = UUIDGenerator.generateUUID(queueName, host.getName()); + + AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments); q.configure(config); return q; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java index 2c645cc555..c2813bb7a5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java @@ -21,22 +21,23 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.server.virtualhost.VirtualHost; - import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.virtualhost.VirtualHost; public class ConflationQueue extends SimpleAMQQueue { - protected ConflationQueue(String name, + protected ConflationQueue(UUID id, + String name, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, - Map<String, Object> args, - String conflationKey) + Map<String, Object> args, String conflationKey) { - super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); + super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); } public String getConflationKey() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index 801fe55939..2493974d45 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -95,4 +96,18 @@ public class DefaultQueueRegistry implements QueueRegistry } _queueMap.clear(); } + + @Override + public synchronized AMQQueue getQueue(UUID queueId) + { + Collection<AMQQueue> queues = _queueMap.values(); + for (AMQQueue queue : queues) + { + if (queue.getId().equals(queueId)) + { + return queue; + } + } + return null; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 53121fc031..89976ca16e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -5,15 +5,16 @@ import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; +import java.util.UUID; public abstract class OutOfOrderQueue extends SimpleAMQQueue { - protected OutOfOrderQueue(String name, boolean durable, String owner, - boolean autoDelete, boolean exclusive, VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, Map<String, Object> arguments) + protected OutOfOrderQueue(UUID id, String name, boolean durable, + String owner, boolean autoDelete, boolean exclusive, + VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) { - super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); + super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java new file mode 100644 index 0000000000..1578d21321 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java @@ -0,0 +1,22 @@ +package org.apache.qpid.server.queue; + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* <p/> +* http://www.apache.org/licenses/LICENSE-2.0 +* <p/> +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +public interface QueueEntryVisitor +{ + boolean visit(QueueEntry entry); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index 1ffc0a3560..72a54c9889 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -24,6 +24,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; +import java.util.UUID; public interface QueueRegistry { @@ -42,4 +43,6 @@ public interface QueueRegistry AMQQueue getQueue(String queue); void stopAllAndUnregisterMBeans(); + + AMQQueue getQueue(UUID queueId); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index e6f059a875..d7eb304c92 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -191,29 +191,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); private final MessageGroupManager _messageGroupManager; - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) + protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { - this(name, durable, owner, autoDelete, exclusive, virtualHost,new SimpleQueueEntryList.Factory(), arguments); + this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments); } - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) + public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) { - this(queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); + this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); } - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) + public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments); + this(id, queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments); } - protected SimpleAMQQueue(AMQShortString name, + protected SimpleAMQQueue(UUID id, + AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, - Map<String,Object> arguments) + QueueEntryListFactory entryListFactory, Map<String,Object> arguments) { if (name == null) @@ -236,7 +236,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _entries = entryListFactory.createQueueEntryList(this); _arguments = arguments; - _id = virtualHost.getConfigStore().createId(); + _id = id; _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 446f57b142..b3566df0c4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; +import java.util.UUID; public class SortedQueue extends OutOfOrderQueue { @@ -33,12 +34,12 @@ public class SortedQueue extends OutOfOrderQueue private final Object _sortedQueueLock = new Object(); private final String _sortedPropertyName; - protected SortedQueue(final String name, final boolean durable, - final String owner, final boolean autoDelete, final boolean exclusive, - final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName) + protected SortedQueue(UUID id, final String name, + final boolean durable, final String owner, final boolean autoDelete, + final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName) { - super(name, durable, owner, autoDelete, exclusive, virtualHost, - new SortedQueueEntryListFactory(sortedPropertyName), arguments); + super(id, name, durable, owner, autoDelete, exclusive, + virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments); this._sortedPropertyName = sortedPropertyName; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index 73f127e097..1307b1dbd4 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -32,19 +32,19 @@ public interface ConfigurationRecoveryHandler public static interface QueueRecoveryHandler { - void queue(String queueName, String owner, boolean exclusive, FieldTable arguments); + void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments); ExchangeRecoveryHandler completeQueueRecovery(); } public static interface ExchangeRecoveryHandler { - void exchange(String exchangeName, String type, boolean autoDelete); + void exchange(UUID id, String exchangeName, String type, boolean autoDelete); BindingRecoveryHandler completeExchangeRecovery(); } public static interface BindingRecoveryHandler { - void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf); + void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf); BrokerLinkRecoveryHandler completeBindingRecovery(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java new file mode 100644 index 0000000000..1a67fdf540 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.apache.qpid.server.util.MapJsonSerializer; + +public class ConfiguredObjectHelper +{ + /** + * Name of queue attribute to store queue creation arguments. + * <p> + * This attribute is not defined yet on Queue configured object interface. + */ + private static final String QUEUE_ARGUMENTS = "ARGUMENTS"; + + private MapJsonSerializer _serializer = new MapJsonSerializer(); + + public void loadQueue(ConfiguredObjectRecord configuredObject, QueueRecoveryHandler qrh) + { + if (Queue.class.getName().equals(configuredObject.getType())) + { + Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes()); + String queueName = (String) attributeMap.get(Queue.NAME); + String owner = (String) attributeMap.get(Queue.OWNER); + boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); + @SuppressWarnings("unchecked") + Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(QUEUE_ARGUMENTS); + FieldTable arguments = null; + if (queueArgumentsMap != null) + { + arguments = FieldTable.convertToFieldTable(queueArgumentsMap); + } + qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments); + } + } + + public ConfiguredObjectRecord updateQueueConfiguredObject(final AMQQueue queue, ConfiguredObjectRecord queueRecord) + { + Map<String, Object> attributesMap = _serializer.deserialize(queueRecord.getAttributes()); + attributesMap.put(Queue.NAME, queue.getName()); + attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); + String newJson = _serializer.serialize(attributesMap); + ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(queue.getId(), queueRecord.getType(), newJson); + return newQueueRecord; + } + + public ConfiguredObjectRecord createQueueConfiguredObject(AMQQueue queue, FieldTable arguments) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Queue.NAME, queue.getName()); + attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner())); + attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); + if (arguments != null) + { + attributesMap.put(QUEUE_ARGUMENTS, FieldTable.convertToMap(arguments)); + } + String json = _serializer.serialize(attributesMap); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(queue.getId(), Queue.class.getName(), json); + return configuredObject; + } + + public void loadExchange(ConfiguredObjectRecord configuredObject, ExchangeRecoveryHandler erh) + { + if (Exchange.class.getName().equals(configuredObject.getType())) + { + Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes()); + String exchangeName = (String) attributeMap.get(Exchange.NAME); + String exchangeType = (String) attributeMap.get(Exchange.TYPE); + String lifeTimePolicy = (String) attributeMap.get(Exchange.LIFETIME_POLICY); + boolean autoDelete = lifeTimePolicy == null + || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; + erh.exchange(configuredObject.getId(), exchangeName, exchangeType, autoDelete); + } + } + + public ConfiguredObjectRecord createExchangeConfiguredObject(org.apache.qpid.server.exchange.Exchange exchange) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Exchange.NAME, exchange.getName()); + attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString())); + attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name() + : LifetimePolicy.PERMANENT.name()); + String json = _serializer.serialize(attributesMap); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(exchange.getId(), Exchange.class.getName(), json); + return configuredObject; + } + + public void loadQueueBinding(ConfiguredObjectRecord configuredObject, BindingRecoveryHandler brh) + { + if (Binding.class.getName().equals(configuredObject.getType())) + { + Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes()); + UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE)); + UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE)); + String bindingName = (String) attributeMap.get(Binding.NAME); + + @SuppressWarnings("unchecked") + Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS); + FieldTable arguments = null; + if (bindingArgumentsMap != null) + { + arguments = FieldTable.convertToFieldTable(bindingArgumentsMap); + } + ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes())); + + brh.binding(configuredObject.getId(), exchangeId, queueId, bindingName, argumentsBB); + } + } + + public ConfiguredObjectRecord createBindingConfiguredObject(org.apache.qpid.server.binding.Binding binding) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Binding.NAME, binding.getBindingKey()); + attributesMap.put(Binding.EXCHANGE, binding.getExchange().getId()); + attributesMap.put(Binding.QUEUE, binding.getQueue().getId()); + Map<String, Object> arguments = binding.getArguments(); + if (arguments != null) + { + attributesMap.put(Binding.ARGUMENTS, arguments); + } + String json = _serializer.serialize(attributesMap); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(binding.getId(), Binding.class.getName(), json); + return configuredObject; + } + + public void recoverQueues(QueueRecoveryHandler qrh, List<ConfiguredObjectRecord> configuredObjects) + { + for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) + { + loadQueue(configuredObjectRecord, qrh); + } + } + + public void recoverExchanges(ExchangeRecoveryHandler erh, List<ConfiguredObjectRecord> configuredObjects) + { + for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) + { + loadExchange(configuredObjectRecord, erh); + } + } + + public void recoverBindings(BindingRecoveryHandler brh, List<ConfiguredObjectRecord> configuredObjects) + { + for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) + { + loadQueueBinding(configuredObjectRecord, brh); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java new file mode 100644 index 0000000000..95e1713d78 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.UUID; + +public class ConfiguredObjectRecord +{ + private UUID _id; + private String _type; + private String _attributes; + + public ConfiguredObjectRecord(UUID id, String type, String attributes) + { + super(); + _id = id; + _type = type; + _attributes = attributes; + } + + public UUID getId() + { + return _id; + } + + public void setId(UUID id) + { + _id = id; + } + + public String getType() + { + return _type; + } + + public String getAttributes() + { + return _attributes; + } + + @Override + public String toString() + { + return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]"; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 13c24e624e..655887e5c2 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; @@ -69,28 +69,22 @@ public interface DurableConfigurationStore void removeExchange(Exchange exchange) throws AMQStoreException; /** - * Binds the specified queue to an exchange with a routing key. + * Store the queue binding. * - * @param exchange The exchange to bind to. - * @param routingKey The routing key to bind by. - * @param queue The queue to bind. - * @param args Additional parameters. + * @param binding queue binding * * @throws AMQStoreException if the operation fails for any reason. */ - void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; + void bindQueue(Binding binding) throws AMQStoreException; /** - * Unbinds the specified from an exchange under a particular routing key. + * Removes queue binding * - * @param exchange The exchange to unbind from. - * @param routingKey The routing key to unbind. - * @param queue The queue to unbind. - * @param args Additional parameters. + * @param binding queue binding to remove * * @throws AMQStoreException If the operation fails for any reason. */ - void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; + void unbindQueue(Binding binding) throws AMQStoreException; /** * Makes the specified queue persistent. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 77df6c5abf..34c7d2d933 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; @@ -48,12 +48,12 @@ public class NullMessageStore implements MessageStore } @Override - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + public void bindQueue(Binding binding) throws AMQStoreException { } @Override - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + public void unbindQueue(Binding binding) throws AMQStoreException { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java index b92d5f3e9b..bd4da648f9 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java @@ -20,15 +20,17 @@ */ package org.apache.qpid.server.store; +import java.util.UUID; + public interface TransactionLogRecoveryHandler { QueueEntryRecoveryHandler begin(MessageStore log); public static interface QueueEntryRecoveryHandler { - void queueEntry(String queuename, long messageId); - DtxRecordRecoveryHandler completeQueueEntryRecovery(); + + void queueEntry(UUID queueId, long messageId); } public static interface DtxRecordRecoveryHandler diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java index 0d81dd151d..576dca847d 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.store; +import java.util.UUID; + public interface TransactionLogResource { - public String getResourceName(); + public UUID getId(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 07d8bb97f8..0371cdcfcb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.sql.Blob; import java.sql.Connection; import java.sql.Driver; @@ -51,12 +52,15 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.ConfiguredObjectHelper; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; @@ -89,12 +93,9 @@ public class DerbyMessageStore implements MessageStore private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; - private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE"; - private static final String QUEUE_TABLE_NAME = "QPID_QUEUE"; - private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS"; - private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY"; + private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES"; - private static final String META_DATA_TABLE_NAME = "QPID_META_DATA"; + private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA"; private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; private static final String LINKS_TABLE_NAME = "QPID_LINKS"; @@ -103,7 +104,9 @@ public class DerbyMessageStore implements MessageStore private static final String XID_TABLE_NAME = "QPID_XIDS"; private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; - private static final int DB_VERSION = 3; + private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; + + private static final int DB_VERSION = 6; @@ -119,38 +122,23 @@ public class DerbyMessageStore implements MessageStore private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; - private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"; - private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), exclusive SMALLINT not null, arguments blob, PRIMARY KEY ( name ))"; - private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; - private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME; - private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; - private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?"; - private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; - private static final String SELECT_FROM_BINDINGS = - "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name"; - private static final String FIND_BINDING = - "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? "; - private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"; - private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; - private static final String FIND_EXCHANGE = "SELECT name FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; - private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"; - private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"; - private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner, exclusive, arguments) VALUES (?, ?, ?, ?)"; - private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; - - private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; - private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)"; - private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?"; - private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id"; - - - private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )"; - private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )"; - - private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)"; - private static final String SELECT_FROM_MESSAGE_CONTENT = - "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset"; - private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; + private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_id varchar(36) not null, message_id bigint not null, PRIMARY KEY (queue_id, message_id) )"; + private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)"; + private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?"; + private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id"; + + + private static final String CREATE_META_DATA_TABLE = "CREATE TABLE " + META_DATA_TABLE_NAME + + " ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )"; + private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE " + MESSAGE_CONTENT_TABLE_NAME + + " ( message_id bigint not null, content blob , PRIMARY KEY (message_id) )"; + + private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + + "( message_id, content ) values (?, ?)"; + private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME + + " WHERE message_id = ?"; + private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + + " WHERE message_id = ?"; private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";; private static final String SELECT_FROM_META_DATA = @@ -214,18 +202,32 @@ public class DerbyMessageStore implements MessageStore private static final String CREATE_XID_ACTIONS_TABLE = "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null," + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " + - "action_type char not null, queue_name varchar(255) not null, message_id bigint not null" + + "action_type char not null, queue_id varchar(36) not null, message_id bigint not null" + ", PRIMARY KEY ( " + - "format, global_id, branch_id, action_type, queue_name, message_id))"; + "format, global_id, branch_id, action_type, queue_id, message_id))"; private static final String INSERT_INTO_XID_ACTIONS = "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " + - "queue_name, message_id ) values (?,?,?,?,?,?) "; + "queue_id, message_id ) values (?,?,?,?,?,?) "; private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String SELECT_ALL_FROM_XID_ACTIONS = - "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME + + "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; + private static final String CREATE_CONFIGURED_OBJECTS_TABLE = "CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME + + " ( id VARCHAR(36) not null, object_type varchar(255), attributes blob, PRIMARY KEY (id))"; + private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME + + " ( id, object_type, attributes) VALUES (?,?,?)"; + private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME + + " set object_type =?, attributes = ? where id = ?"; + private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME + + " where id = ?"; + private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME + + " where id = ?"; + private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME; + + private final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; private final StateManager _stateManager; @@ -244,6 +246,8 @@ public class DerbyMessageStore implements MessageStore _stateManager = new StateManager(_eventManager); } + private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); + @Override public void configureConfigStore(String name, ConfigurationRecoveryHandler configRecoveryHandler, @@ -323,9 +327,7 @@ public class DerbyMessageStore implements MessageStore Connection conn = newAutoCommitConnection(); createVersionTable(conn); - createExchangeTable(conn); - createQueueTable(conn); - createBindingsTable(conn); + createConfiguredObjectsTable(conn); createQueueEntryTable(conn); createMetaDataTable(conn); createMessageContentTable(conn); @@ -366,15 +368,14 @@ public class DerbyMessageStore implements MessageStore } - - private void createExchangeTable(final Connection conn) throws SQLException + private void createConfiguredObjectsTable(final Connection conn) throws SQLException { - if(!tableExists(EXCHANGE_TABLE_NAME, conn)) + if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { - stmt.execute(CREATE_EXCHANGE_TABLE); + stmt.execute(CREATE_CONFIGURED_OBJECTS_TABLE); } finally { @@ -383,39 +384,6 @@ public class DerbyMessageStore implements MessageStore } } - private void createQueueTable(final Connection conn) throws SQLException - { - if(!tableExists(QUEUE_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute(CREATE_QUEUE_TABLE); - } - finally - { - stmt.close(); - } - } - } - - private void createBindingsTable(final Connection conn) throws SQLException - { - if(!tableExists(BINDINGS_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute(CREATE_BINDINGS_TABLE); - } - finally - { - stmt.close(); - } - } - - } - private void createQueueEntryTable(final Connection conn) throws SQLException { if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) @@ -433,7 +401,7 @@ public class DerbyMessageStore implements MessageStore } - private void createMetaDataTable(final Connection conn) throws SQLException + private void createMetaDataTable(final Connection conn) throws SQLException { if(!tableExists(META_DATA_TABLE_NAME, conn)) { @@ -558,26 +526,25 @@ public class DerbyMessageStore implements MessageStore private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException { - try { + List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this); - recoverQueues(qrh); + _configuredObjectHelper.recoverQueues(qrh, configuredObjects); ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); - List<String> exchanges = loadExchanges(erh); + _configuredObjectHelper.recoverExchanges(erh, configuredObjects); + ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery(); - recoverBindings(brh, exchanges); + _configuredObjectHelper.recoverBindings(brh, configuredObjects); + ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); recoverBrokerLinks(lrh); } catch (SQLException e) { - throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); } - - } private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) @@ -718,176 +685,6 @@ public class DerbyMessageStore implements MessageStore } - private void recoverQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE); - try - { - - while(rs.next()) - { - String queueName = rs.getString(1); - _logger.debug("Got queue " + queueName); - String owner = rs.getString(2); - boolean exclusive = rs.getBoolean(3); - Blob argumentsAsBlob = rs.getBlob(4); - - byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); - FieldTable arguments; - if(dataAsBytes.length > 0) - { - - try - { - arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length); - } - catch (IOException e) - { - throw new RuntimeException("IO Exception should not be thrown",e); - } - } - else - { - arguments = null; - } - - qrh.queue(queueName, owner, exclusive, arguments); - - } - - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - - - private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException - { - - List<String> exchanges = new ArrayList<String>(); - Connection conn = null; - try - { - conn = newAutoCommitConnection(); - - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE); - try - { - while(rs.next()) - { - String exchangeName = rs.getString(1); - String type = rs.getString(2); - boolean autoDelete = rs.getShort(3) != 0; - - exchanges.add(exchangeName); - - erh.exchange(exchangeName, type, autoDelete); - - } - return exchanges; - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - if(conn != null) - { - conn.close(); - } - } - - } - - private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException - { - _logger.info("Recovering bindings..."); - - Connection conn = null; - try - { - conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS); - - try - { - ResultSet rs = stmt.executeQuery(); - - try - { - - while(rs.next()) - { - String exchangeName = rs.getString(1); - String queueName = rs.getString(2); - String bindingKey = rs.getString(3); - Blob arguments = rs.getBlob(4); - java.nio.ByteBuffer buf; - - if(arguments != null && arguments.length() != 0) - { - byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length()); - buf = java.nio.ByteBuffer.wrap(argumentBytes); - } - else - { - buf = null; - } - - brh.binding(exchangeName, queueName, bindingKey, buf); - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - } - finally - { - if(conn != null) - { - conn.close(); - } - } - } - - - @Override public void close() throws Exception { @@ -999,60 +796,8 @@ public class DerbyMessageStore implements MessageStore { if (_stateManager.isInState(State.ACTIVE)) { - try - { - Connection conn = newAutoCommitConnection(); - - try - { - - - PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE); - try - { - stmt.setString(1, exchange.getNameShortString().toString()); - ResultSet rs = stmt.executeQuery(); - try - { - - // If we don't have any data in the result set then we can add this exchange - if (!rs.next()) - { - - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_EXCHANGE); - try - { - insertStmt.setString(1, exchange.getName().toString()); - insertStmt.setString(2, exchange.getTypeShortString().asString()); - insertStmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0); - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e); - } + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); + insertConfiguredObject(configuredObject); } } @@ -1060,150 +805,32 @@ public class DerbyMessageStore implements MessageStore @Override public void removeExchange(Exchange exchange) throws AMQStoreException { - - try + int results = removeConfiguredObject(exchange.getId()); + if (results == 0) { - Connection conn = newAutoCommitConnection(); - try - { - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE); - try - { - stmt.setString(1, exchange.getNameShortString().toString()); - int results = stmt.executeUpdate(); - stmt.close(); - if(results == 0) - { - throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found"); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found"); } } @Override - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + public void bindQueue(Binding binding) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - try - { - Connection conn = newAutoCommitConnection(); - - try - { - - PreparedStatement stmt = conn.prepareStatement(FIND_BINDING); - try - { - stmt.setString(1, exchange.getNameShortString().toString() ); - stmt.setString(2, queue.getNameShortString().toString()); - stmt.setString(3, routingKey == null ? null : routingKey.toString()); - - ResultSet rs = stmt.executeQuery(); - try - { - // If this binding is not already in the store then create it. - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BINDINGS); - try - { - insertStmt.setString(1, exchange.getNameShortString().toString() ); - insertStmt.setString(2, queue.getNameShortString().toString()); - insertStmt.setString(3, routingKey == null ? null : routingKey.toString()); - if(args != null) - { - // TODO - In Java 6 we could use create/set Blob - byte[] bytes = args.getDataAsBytes(); - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - insertStmt.setBinaryStream(4, bis, bytes.length); - } - else - { - insertStmt.setNull(4, Types.BLOB); - } - - insertStmt.executeUpdate(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " to database: " + e.getMessage(), e); - } - + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); + insertConfiguredObject(configuredObject); } - - } @Override - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + public void unbindQueue(Binding binding) throws AMQStoreException { - Connection conn = null; - PreparedStatement stmt = null; - - try - { - conn = newAutoCommitConnection(); - // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob - stmt = conn.prepareStatement(DELETE_FROM_BINDINGS); - stmt.setString(1, exchange.getNameShortString().toString() ); - stmt.setString(2, queue.getNameShortString().toString()); - stmt.setString(3, routingKey == null ? null : routingKey.toString()); - - int result = stmt.executeUpdate(); - - if(result != 1) - { - throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " not found"); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " in database: " + e.getMessage(), e); - } - finally + int results = removeConfiguredObject(binding.getId()); + if (results == 0) { - closePreparedStatement(stmt); - closeConnection(conn); + throw new AMQStoreException("Binding " + binding + " not found"); } } @@ -1220,68 +847,8 @@ public class DerbyMessageStore implements MessageStore if (_stateManager.isInState(State.ACTIVE)) { - try - { - Connection conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); - try - { - stmt.setString(1, queue.getNameShortString().toString()); - ResultSet rs = stmt.executeQuery(); - try - { - - // If we don't have any data in the result set then we can add this queue - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_QUEUE); - - try - { - String owner = queue.getOwner() == null ? null : queue.getOwner().toString(); - - insertStmt.setString(1, queue.getNameShortString().toString()); - insertStmt.setString(2, owner); - insertStmt.setBoolean(3,queue.isExclusive()); - - final byte[] underlying; - if(arguments != null) - { - underlying = arguments.getDataAsBytes(); - } - else - { - underlying = new byte[0]; - } - - ByteArrayInputStream bis = new ByteArrayInputStream(underlying); - insertStmt.setBinaryStream(4,bis,underlying.length); - - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - conn.close(); - - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); - } + ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); + insertConfiguredObject(queueConfiguredObject); } } @@ -1299,54 +866,11 @@ public class DerbyMessageStore implements MessageStore { if (_stateManager.isInState(State.ACTIVE)) { - try + ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId()); + if (queueConfiguredObject != null) { - Connection conn = newAutoCommitConnection(); - - try - { - PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); - try - { - stmt.setString(1, queue.getNameShortString().toString()); - - ResultSet rs = stmt.executeQuery(); - try - { - if (rs.next()) - { - PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY); - try - { - stmt2.setBoolean(1,queue.isExclusive()); - stmt2.setString(2, queue.getNameShortString().toString()); - - stmt2.execute(); - } - finally - { - stmt2.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); + ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject); + updateConfiguredObject(newQueueRecord); } } @@ -1410,31 +934,11 @@ public class DerbyMessageStore implements MessageStore { AMQShortString name = queue.getNameShortString(); _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); - Connection conn = null; - PreparedStatement stmt = null; - try + int results = removeConfiguredObject(queue.getId()); + if (results == 0) { - conn = newAutoCommitConnection(); - stmt = conn.prepareStatement(DELETE_FROM_QUEUE); - stmt.setString(1, name.toString()); - int results = stmt.executeUpdate(); - - if (results == 0) - { - throw new AMQStoreException("Queue " + name + " not found"); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e); - } - finally - { - closePreparedStatement(stmt); - closeConnection(conn); + throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found"); } - - } @Override @@ -1676,8 +1180,6 @@ public class DerbyMessageStore implements MessageStore public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException { - String name = queue.getResourceName(); - Connection conn = connWrapper.getConnection(); @@ -1685,13 +1187,13 @@ public class DerbyMessageStore implements MessageStore { if (_logger.isDebugEnabled()) { - _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); + _logger.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + queue.getId()+ "[Connection" + conn + "]"); } PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); try { - stmt.setString(1,name); + stmt.setString(1, queue.getId().toString()); stmt.setLong(2,messageId); stmt.executeUpdate(); } @@ -1703,7 +1205,7 @@ public class DerbyMessageStore implements MessageStore catch (SQLException e) { _logger.error("Failed to enqueue: " + e.getMessage(), e); - throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name + throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId() + " to database", e); } @@ -1711,8 +1213,6 @@ public class DerbyMessageStore implements MessageStore public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException { - String name = queue.getResourceName(); - Connection conn = connWrapper.getConnection(); @@ -1722,7 +1222,7 @@ public class DerbyMessageStore implements MessageStore PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); try { - stmt.setString(1,name); + stmt.setString(1, queue.getId().toString()); stmt.setLong(2,messageId); int results = stmt.executeUpdate(); @@ -1730,12 +1230,14 @@ public class DerbyMessageStore implements MessageStore if(results != 1) { - throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); + throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + + " with id " + queue.getId()); } if (_logger.isDebugEnabled()) { - _logger.debug("Dequeuing message " + messageId + " on queue " + name ); + _logger.debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + + " with id " + queue.getId()); } } finally @@ -1746,8 +1248,8 @@ public class DerbyMessageStore implements MessageStore catch (SQLException e) { _logger.error("Failed to dequeue: " + e.getMessage(), e); - throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name - + " from database", e); + throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + + " with id " + queue.getId() + " from database", e); } } @@ -1840,7 +1342,7 @@ public class DerbyMessageStore implements MessageStore stmt.setString(4, "E"); for(Transaction.Record record : enqueues) { - stmt.setString(5, record.getQueue().getResourceName()); + stmt.setString(5, record.getQueue().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); stmt.executeUpdate(); } @@ -1851,7 +1353,7 @@ public class DerbyMessageStore implements MessageStore stmt.setString(4, "D"); for(Transaction.Record record : dequeues) { - stmt.setString(5, record.getQueue().getResourceName()); + stmt.setString(5, record.getQueue().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); stmt.executeUpdate(); } @@ -2081,9 +1583,9 @@ public class DerbyMessageStore implements MessageStore while(rs.next()) { - String queueName = rs.getString(1); + String id = rs.getString(1); long messageId = rs.getLong(2); - queueEntryHandler.queueEntry(queueName,messageId); + queueEntryHandler.queueEntry(UUID.fromString(id), messageId); } } finally @@ -2137,13 +1639,13 @@ public class DerbyMessageStore implements MessageStore private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage { - private final String _queueName; private long _messageNumber; + private UUID _queueId; - public RecordImpl(String queueName, long messageNumber) + public RecordImpl(UUID queueId, long messageNumber) { - _queueName = queueName; _messageNumber = messageNumber; + _queueId = queueId; } @Override @@ -2177,9 +1679,9 @@ public class DerbyMessageStore implements MessageStore } @Override - public String getResourceName() + public UUID getId() { - return _queueName; + return _queueId; } } @@ -2237,10 +1739,10 @@ public class DerbyMessageStore implements MessageStore { String actionType = rs.getString(1); - String queueName = rs.getString(2); + UUID queueId = UUID.fromString(rs.getString(2)); long messageId = rs.getLong(3); - RecordImpl record = new RecordImpl(queueName, messageId); + RecordImpl record = new RecordImpl(queueId, messageId); List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues; records.add(record); } @@ -2319,11 +1821,11 @@ public class DerbyMessageStore implements MessageStore } - private void addContent(Connection conn, long messageId, int offset, ByteBuffer src) + private void addContent(Connection conn, long messageId, ByteBuffer src) { if(_logger.isDebugEnabled()) { - _logger.debug("Adding content chunk offset " + offset + " for message " +messageId); + _logger.debug("Adding content for message " +messageId); } PreparedStatement stmt = null; @@ -2336,20 +1838,15 @@ public class DerbyMessageStore implements MessageStore stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); stmt.setLong(1,messageId); - stmt.setInt(2, offset); - stmt.setInt(3, offset+chunkData.length); - - - // TODO in Java 6 we could just use blobs ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); - stmt.setBinaryStream(4, bis, chunkData.length); + stmt.setBinaryStream(2, bis, chunkData.length); stmt.executeUpdate(); } catch (SQLException e) { closeConnection(conn); - throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); + throw new RuntimeException("Error adding content for message " + messageId + ": " + e.getMessage(), e); } finally { @@ -2370,33 +1867,32 @@ public class DerbyMessageStore implements MessageStore stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); stmt.setLong(1,messageId); - stmt.setInt(2, offset); - stmt.setInt(3, offset+dst.remaining()); ResultSet rs = stmt.executeQuery(); int written = 0; - while(rs.next()) + if (rs.next()) { - int offsetInMessage = rs.getInt(1); - Blob dataAsBlob = rs.getBlob(2); + + Blob dataAsBlob = rs.getBlob(1); final int size = (int) dataAsBlob.length(); byte[] dataAsBytes = dataAsBlob.getBytes(1, size); - int posInArray = offset + written - offsetInMessage; - int count = size - posInArray; - if(count > dst.remaining()) + if (offset > size) { - count = dst.remaining(); + throw new RuntimeException("Offset " + offset + " is greater than message size " + size + + " for message id " + messageId + "!"); + } - dst.put(dataAsBytes,posInArray,count); - written+=count; - if(dst.remaining() == 0) + written = size - offset; + if(written > dst.remaining()) { - break; + written = dst.remaining(); } + + dst.put(dataAsBytes, offset, written); } return written; @@ -2635,7 +2131,7 @@ public class DerbyMessageStore implements MessageStore try { storeMetaData(conn, _messageId, _metaData); - DerbyMessageStore.this.addContent(conn, _messageId, 0, + DerbyMessageStore.this.addContent(conn, _messageId, _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); } finally @@ -2699,4 +2195,255 @@ public class DerbyMessageStore implements MessageStore { return _storeLocation; } + + private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException + { + if (_stateManager.isInState(State.ACTIVE)) + { + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + // If we don't have any data in the result set then we can add this configured object + if (!rs.next()) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); + try + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, configuredObject.getType()); + if(configuredObject.getAttributes() == null) + { + insertStmt.setNull(3, Types.BLOB); + } + else + { + byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); + } + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + } + } + + private int removeConfiguredObject(UUID id) throws AMQStoreException + { + int results = 0; + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); + try + { + stmt.setString(1, id.toString()); + results = stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e); + } + return results; + } + + private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws AMQStoreException + { + if (_stateManager.isInState(State.ACTIVE)) + { + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); + try + { + stmt2.setString(1, configuredObject.getType()); + if (configuredObject.getAttributes() != null) + { + byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + stmt2.setBinaryStream(2, bis, attributesAsBytes.length); + } + else + { + stmt2.setNull(2, Types.BLOB); + } + stmt2.setString(3, configuredObject.getId().toString()); + stmt2.execute(); + } + finally + { + stmt2.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + } + } + + private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException + { + ConfiguredObjectRecord result = null; + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, id.toString()); + ResultSet rs = stmt.executeQuery(); + try + { + if (rs.next()) + { + String type = rs.getString(1); + Blob blob = rs.getBlob(2); + String attributes = null; + if (blob != null) + { + attributes = blobToString(blob); + } + result = new ConfiguredObjectRecord(id, type, attributes); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " + + e.getMessage(), e); + } + return result; + } + + private String blobToString(Blob blob) throws SQLException + { + byte[] bytes = blob.getBytes(1, (int)blob.length()); + return new String(bytes, UTF8_CHARSET); + } + + private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException + { + ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>(); + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + String id = rs.getString(1); + String objectType = rs.getString(2); + String attributes = blobToString(rs.getBlob(3)); + results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes)); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + return results; + } }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 54b91a0ce2..79a8bc0e4c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.flow.WindowCreditManager; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; @@ -683,13 +684,12 @@ public class ServerSessionDelegate extends SessionDelegate { String exchangeName = method.getExchange(); VirtualHost virtualHost = getVirtualHost(session); - Exchange exchange = getExchange(session, exchangeName); + ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); //we must check for any unsupported arguments present and throw not-implemented if(method.hasArguments()) { Map<String,Object> args = method.getArguments(); - //QPID-3392: currently we don't support any! if(!args.isEmpty()) { @@ -697,120 +697,113 @@ public class ServerSessionDelegate extends SessionDelegate return; } } - - if(method.getPassive()) + synchronized(exchangeRegistry) { - if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'"); - - } - else - { - if(!exchange.getTypeShortString().toString().equals(method.getType()) && (method.getType() != null && method.getType().length() > 0)) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() +"."); - } - } + Exchange exchange = getExchange(session, exchangeName); - } - else - { - if (exchange == null) + if(method.getPassive()) { - if(exchangeName.startsWith("amq.")) + if(exchange == null) { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix 'amq.'."); + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); } - else if(exchangeName.startsWith("qpid.")) + else { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix 'qpid.'."); + if (!exchange.getTypeShortString().toString().equals(method.getType()) + && (method.getType() != null && method.getType().length() > 0)) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + "."); + } } - else + } + else + { + if (exchange == null) { - ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); - ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); - - - - try + if (exchangeName.startsWith("amq.")) { - - exchange = exchangeFactory.createExchange(method.getExchange(), - method.getType(), - method.getDurable(), - method.getAutoDelete()); - - String alternateExchangeName = method.getAlternateExchange(); - boolean validAlternate; - if(alternateExchangeName != null && alternateExchangeName.length() != 0) + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " + + exchangeName + " which begins with reserved prefix 'amq.'."); + } + else if (exchangeName.startsWith("qpid.")) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " + + exchangeName + " which begins with reserved prefix 'qpid.'."); + } + else + { + ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + try { - Exchange alternate = getExchange(session, alternateExchangeName); - if(alternate == null) + exchange = exchangeFactory.createExchange(method.getExchange(), + method.getType(), + method.getDurable(), + method.getAutoDelete()); + String alternateExchangeName = method.getAlternateExchange(); + boolean validAlternate; + if(alternateExchangeName != null && alternateExchangeName.length() != 0) { - validAlternate = false; + Exchange alternate = getExchange(session, alternateExchangeName); + if(alternate == null) + { + validAlternate = false; + } + else + { + exchange.setAlternateExchange(alternate); + validAlternate = true; + } } else { - exchange.setAlternateExchange(alternate); validAlternate = true; } - } - else - { - validAlternate = true; - } - - if(validAlternate) - { - if (exchange.isDurable()) + if(validAlternate) { - DurableConfigurationStore store = virtualHost.getMessageStore(); - store.createExchange(exchange); + if (exchange.isDurable()) + { + DurableConfigurationStore store = virtualHost.getMessageStore(); + store.createExchange(exchange); + } + exchangeRegistry.registerExchange(exchange); } - - exchangeRegistry.registerExchange(exchange); + else + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, + "Unknown alternate exchange " + alternateExchangeName); + } + } + catch(AMQUnknownExchangeType e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); } - else + catch (AMQException e) { - exception(session, method, ExecutionErrorCode.NOT_FOUND, - "Unknown alternate exchange " + alternateExchangeName); + exception(session, method, e, "Cannot declare exchange '" + exchangeName); } } - catch(AMQUnknownExchangeType e) + } + else + { + if(!exchange.getTypeShortString().toString().equals(method.getType())) { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare exchange: " + exchangeName + + " of type " + exchange.getTypeShortString() + + " to " + method.getType() +"."); } - catch (AMQException e) + else if(method.hasAlternateExchange() + && (exchange.getAlternateExchange() == null || + !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) { - exception(session, method, e, "Cannot declare exchange '" + exchangeName); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to change alternate exchange of: " + exchangeName + + " from " + exchange.getAlternateExchange() + + " to " + method.getAlternateExchange() +"."); } } } - else - { - if(!exchange.getTypeShortString().toString().equals(method.getType())) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to redeclare exchange: " + exchangeName - + " of type " + exchange.getTypeShortString() - + " to " + method.getType() +"."); - } - else if(method.hasAlternateExchange() - && (exchange.getAlternateExchange() == null || - !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to change alternate exchange of: " + exchangeName - + " from " + exchange.getAlternateExchange() - + " to " + method.getAlternateExchange() +"."); - } - } - } } @@ -1396,8 +1389,8 @@ public class ServerSessionDelegate extends SessionDelegate { String owner = body.getExclusive() ? session.getClientID() : null; - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), - body.getExclusive(), virtualHost, body.getArguments()); + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, body.getDurable(), owner, + body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments()); return queue; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java new file mode 100644 index 0000000000..2d9ba38555 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.io.StringWriter; +import java.util.Map; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +public class MapJsonSerializer +{ + private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() + { + }; + + private ObjectMapper _mapper; + + public MapJsonSerializer() + { + _mapper = new ObjectMapper(); + } + + public String serialize(Map<String, Object> attributeMap) + { + StringWriter stringWriter = new StringWriter(); + try + { + _mapper.writeValue(stringWriter, attributeMap); + } + catch (Exception e) + { + throw new RuntimeException("Failure to serialize map:" + attributeMap, e); + } + return stringWriter.toString(); + } + + public Map<String, Object> deserialize(String json) + { + Map<String, Object> attributesMap = null; + try + { + attributesMap = _mapper.readValue(json, MAP_TYPE_REFERENCE); + } + catch (Exception e) + { + throw new RuntimeException("Failure to deserialize json:" + json, e); + } + return attributesMap; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 9333456c2e..e956806823 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -100,7 +100,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return this; } - public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments) + public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments) { try { @@ -108,7 +108,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if (q == null) { - q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost, + q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost, FieldTable.convertToMap(arguments)); _virtualHost.getQueueRegistry().registerQueue(q); } @@ -130,7 +130,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return this; } - public void exchange(String exchangeName, String type, boolean autoDelete) + public void exchange(UUID id, String exchangeName, String type, boolean autoDelete) { try { @@ -139,7 +139,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS); if (exchange == null) { - exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0); + exchange = _virtualHost.getExchangeFactory().createExchange(id, exchangeNameSS, new AMQShortString(type), true, autoDelete, 0); _virtualHost.getExchangeRegistry().registerExchange(exchange); } } @@ -212,7 +212,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } for(Transaction.Record record : enqueues) { - final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); + final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -265,13 +265,13 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa StringBuilder xidString = xidAsString(id); CurrentActor.get().message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getQueue().getResourceName())); + record.getQueue().getId().toString())); } } for(Transaction.Record record : dequeues) { - final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); + final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -315,7 +315,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa StringBuilder xidString = xidAsString(id); CurrentActor.get().message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getQueue().getResourceName())); + record.getQueue().getId().toString())); } } @@ -354,21 +354,22 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } - public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf) + @Override + public void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf) { try { - Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName); + Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeId); if (exchange == null) { - _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName); + _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId); return; } - - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName)); + + AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId); if (queue == null) { - _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName); + _logger.error("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName()); } else { @@ -392,10 +393,10 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null) { - _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName + _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName() + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"); - bf.restoreBinding(bindingKey, queue, exchange, argumentMap); + bf.restoreBinding(bindingId, bindingKey, queue, exchange, argumentMap); } } } @@ -417,16 +418,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public void queueEntry(final String queueName, long messageId) + public void queueEntry(final UUID queueId, long messageId) { - AMQShortString queueNameShortString = new AMQShortString(queueName); - - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString); - + AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId); try { if(queue != null) { + String queueName = queue.getName(); ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); @@ -436,7 +435,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if (_logger.isDebugEnabled()) { - _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString()); + _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); } Integer count = _queueRecoveries.get(queueName); @@ -451,7 +450,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } else { - _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); + _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); Transaction txn = _store.newTransaction(); txn.dequeueMessage(queue, new DummyMessage(messageId)); txn.commitTranAsync(); @@ -459,15 +458,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } else { - _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); + _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); Transaction txn = _store.newTransaction(); TransactionLogResource mockQueue = new TransactionLogResource() { - - public String getResourceName() + @Override + public UUID getId() { - return queueName; + return queueId; } }; txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); @@ -479,9 +478,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { throw new RuntimeException(e); } - - - } public DtxRecordRecoveryHandler completeQueueEntryRecovery() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index b6ee95a1cb..afd8fd9ed2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; @@ -45,6 +46,7 @@ import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -274,7 +276,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase public TestQueue(AMQShortString name) throws AMQException { - super(name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP); + super(UUIDGenerator.generateUUID(), name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP); ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry().registerQueue(this); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java index 1fac4afe29..9034bf9c3a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java @@ -26,6 +26,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; @@ -52,7 +53,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testGeneralProperties() throws Exception { DirectExchange exchange = new DirectExchange(); - exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -67,7 +68,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testDirectExchangeMBean() throws Exception { DirectExchange exchange = new DirectExchange(); - exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -82,7 +83,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testTopicExchangeMBean() throws Exception { TopicExchange exchange = new TopicExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -97,7 +98,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testHeadersExchangeMBean() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -119,7 +120,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testHeadersExchangeMBeanMatchPropertyNoValue() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -137,7 +138,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testInvalidHeaderBindingMalformed() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 2e3ff90df9..e123a968a4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.queue; +import java.util.UUID; + import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.AMQException; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 8041d59ffa..52ad4a7c5b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -137,7 +138,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } try { - _queue = new SimpleAMQQueue(_qname, false, _owner, false, false,null, Collections.EMPTY_MAP); + _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -479,7 +480,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testAutoDeleteQueue() throws Exception { _queue.stop(); - _queue = new SimpleAMQQueue(_qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); + _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); _queue.setDeleteOnNoConsumers(true); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); @@ -691,8 +692,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testProcessQueueWithUniqueSelectors() throws Exception { TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); - SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false, - false, _virtualHost, factory, null) + SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), "testQueue", false,"testOwner", + false, false, _virtualHost, factory, null) { @Override public void deliverAsync(Subscription sub) @@ -1028,8 +1029,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase int dequeueMessageIndex = 1; // create queue with overridden method deliverAsync - SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false, - new AMQShortString("testOwner"), false, false, _virtualHost, null) + SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), + false, new AMQShortString("testOwner"), false, false, _virtualHost, null) { @Override public void deliverAsync(Subscription sub) @@ -1099,8 +1100,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testEqueueDequeuedEntry() { // create a queue where each even entry is considered a dequeued - SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"), - false, false, _virtualHost, new QueueEntryListFactory() + SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), false, + new AMQShortString("testOwner"), false, false, _virtualHost, new QueueEntryListFactory() { public QueueEntryList createQueueEntryList(AMQQueue queue) { @@ -1177,8 +1178,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testActiveConsumerCount() throws Exception { - final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"), - false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); + final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("testActiveConsumerCount"), false, + new AMQShortString("testOwner"), false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); //verify adding an active subscription increases the count final MockSubscription subscription1 = new MockSubscription(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java new file mode 100644 index 0000000000..a1cbb2cbc8 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -0,0 +1,377 @@ +package org.apache.qpid.server.store; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.logging.subjects.TestBlankSubject; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockStoredMessage; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +public class DurableConfigurationStoreTest extends QpidTestCase +{ + private static final String EXCHANGE_NAME = "exchangeName"; + private String _storePath; + private String _storeName; + private MessageStore _store; + private Configuration _configuration; + + private ConfigurationRecoveryHandler _recoveryHandler; + private QueueRecoveryHandler _queueRecoveryHandler; + private ExchangeRecoveryHandler _exchangeRecoveryHandler; + private BindingRecoveryHandler _bindingRecoveryHandler; + private ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler _linkRecoveryHandler; + private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; + private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; + private TransactionLogRecoveryHandler _logRecoveryHandler; + private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; + private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; + + private Exchange _exchange = mock(Exchange.class); + private static final String ROUTING_KEY = "routingKey"; + private static final String QUEUE_NAME = "queueName"; + private FieldTable _bindingArgs; + private UUID _queueId; + private UUID _exchangeId; + + public void setUp() throws Exception + { + super.setUp(); + + _queueId = UUIDGenerator.generateUUID(); + _exchangeId = UUIDGenerator.generateUUID(); + + _storeName = getName(); + _storePath = TMP_FOLDER + "/" + _storeName; + FileUtils.delete(new File(_storePath), true); + setTestSystemProperty("QPID_WORK", TMP_FOLDER); + _configuration = mock(Configuration.class); + _recoveryHandler = mock(ConfigurationRecoveryHandler.class); + _queueRecoveryHandler = mock(QueueRecoveryHandler.class); + _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class); + _bindingRecoveryHandler = mock(BindingRecoveryHandler.class); + _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); + _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); + _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); + _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); + _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); + + when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); + when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_queueRecoveryHandler); + when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_exchangeRecoveryHandler); + when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_bindingRecoveryHandler); + when(_bindingRecoveryHandler.completeBindingRecovery()).thenReturn(_linkRecoveryHandler); + when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); + when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); + when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME)); + when(_exchange.getId()).thenReturn(_exchangeId); + when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn( + _storePath); + + _bindingArgs = new FieldTable(); + AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue(); + String argValue = "some selector expression"; + _bindingArgs.put(argKey, argValue); + + reopenStore(); + } + + public void tearDown() throws Exception + { + FileUtils.delete(new File(_storePath), true); + super.tearDown(); + } + + public void testCreateExchange() throws Exception + { + Exchange exchange = createTestExchange(); + _store.createExchange(exchange); + + reopenStore(); + verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true); + } + + public void testRemoveExchange() throws Exception + { + Exchange exchange = createTestExchange(); + _store.createExchange(exchange); + + _store.removeExchange(exchange); + + reopenStore(); + verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean()); + } + + public void testBindQueue() throws Exception + { + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + Binding binding = new Binding(UUIDGenerator.generateUUID(), ROUTING_KEY, queue, _exchange, + FieldTable.convertToMap(_bindingArgs)); + _store.bindQueue(binding); + + reopenStore(); + + ByteBuffer argsAsBytes = ByteBuffer.wrap(_bindingArgs.getDataAsBytes()); + + verify(_bindingRecoveryHandler).binding(binding.getId(), _exchange.getId(), queue.getId(), ROUTING_KEY, argsAsBytes); + } + + public void testUnbindQueue() throws Exception + { + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + Binding binding = new Binding(UUIDGenerator.generateUUID(), ROUTING_KEY, queue, _exchange, + FieldTable.convertToMap(_bindingArgs)); + _store.bindQueue(binding); + + _store.unbindQueue(binding); + reopenStore(); + + verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(), + isA(ByteBuffer.class)); + } + + public void testCreateQueueAMQQueue() throws Exception + { + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + _store.createQueue(queue); + + reopenStore(); + verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null); + } + + public void testCreateQueueAMQQueueFieldTable() throws Exception + { + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + _store.createQueue(queue, arguments); + + reopenStore(); + verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments); + } + + public void testUpdateQueue() throws Exception + { + // create queue + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + _store.createQueue(queue, arguments); + + // update the queue to have exclusive=false + queue = createTestQueue(getName(), getName() + "Owner", false); + _store.updateQueue(queue); + + reopenStore(); + verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments); + } + + public void testRemoveQueue() throws Exception + { + // create queue + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + _store.createQueue(queue, arguments); + + // remove queue + _store.removeQueue(queue); + reopenStore(); + verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(), + any(FieldTable.class)); + } + + private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException + { + AMQQueue queue = mock(AMQQueue.class); + when(queue.getName()).thenReturn(queueName); + when(queue.getNameShortString()).thenReturn(AMQShortString.valueOf(queueName)); + when(queue.getOwner()).thenReturn(AMQShortString.valueOf(queueOwner)); + when(queue.isExclusive()).thenReturn(exclusive); + when(queue.getId()).thenReturn(_queueId); + return queue; + } + + private Exchange createTestExchange() + { + Exchange exchange = mock(Exchange.class); + when(exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(getName())); + when(exchange.getName()).thenReturn(getName()); + when(exchange.getTypeShortString()).thenReturn(AMQShortString.valueOf(getName() + "Type")); + when(exchange.isAutoDelete()).thenReturn(true); + when(exchange.getId()).thenReturn(_exchangeId); + return exchange; + } + + private void reopenStore() throws Exception + { + if (_store != null) + { + _store.close(); + } + _store = createStore(); + + _store.configureConfigStore(_storeName, _recoveryHandler, _configuration); + _store.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration); + _store.activate(); + } + + protected MessageStore createStore() throws Exception + { + String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY); + if (storeFactoryClass == null) + { + storeFactoryClass = DerbyMessageStoreFactory.class.getName(); + } + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + MessageStoreFactory factory = (MessageStoreFactory) Class.forName(storeFactoryClass).newInstance(); + return factory.createMessageStore(); + } + + public void testRecordXid() throws Exception + { + Record enqueueRecord = getTestRecord(1); + Record dequeueRecord = getTestRecord(2); + Record[] enqueues = { enqueueRecord }; + Record[] dequeues = { dequeueRecord }; + byte[] globalId = new byte[] { 1 }; + byte[] branchId = new byte[] { 2 }; + + Transaction transaction = _store.newTransaction(); + transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); + transaction.commitTran(); + reopenStore(); + verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + transaction = _store.newTransaction(); + transaction.removeXid(1l, globalId, branchId); + transaction.commitTran(); + + reopenStore(); + verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + } + + private Record getTestRecord(long messageNumber) + { + UUID queueId1 = UUIDGenerator.generateUUID(); + TransactionLogResource queue1 = mock(TransactionLogResource.class); + when(queue1.getId()).thenReturn(queueId1); + EnqueableMessage message1 = mock(EnqueableMessage.class); + when(message1.isPersistent()).thenReturn(true); + when(message1.getMessageNumber()).thenReturn(messageNumber); + when(message1.getStoredMessage()).thenReturn(new MockStoredMessage(messageNumber)); + Record enqueueRecord = new TestRecord(queue1, message1); + return enqueueRecord; + } + + private static class TestRecord implements Record + { + private TransactionLogResource _queue; + private EnqueableMessage _message; + + public TestRecord(TransactionLogResource queue, EnqueableMessage message) + { + super(); + _queue = queue; + _message = message; + } + + @Override + public TransactionLogResource getQueue() + { + return _queue; + } + + @Override + public EnqueableMessage getMessage() + { + return _message; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); + result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (!(obj instanceof Record)) + { + return false; + } + Record other = (Record) obj; + if (_message == null && other.getMessage() != null) + { + return false; + } + if (_queue == null && other.getQueue() != null) + { + return false; + } + if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) + { + return false; + } + return _queue.getId().equals(other.getQueue().getId()); + } + + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index c589bd108b..3fb0776083 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -58,6 +59,7 @@ import java.io.File; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; /** * This tests the MessageStores by using the available interfaces. @@ -739,7 +741,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase try { - exchange = type.newInstance(getVirtualHost(), name, durable, 0, false); + exchange = type.newInstance(UUIDGenerator.generateUUID(), getVirtualHost(), name, durable, 0, false); } catch (AMQException e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 5a11a7aa32..8a34e92985 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.util; +import java.util.UUID; + import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.AMQException; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java new file mode 100644 index 0000000000..56567523df --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.util.HashMap; +import java.util.Map; + +import junit.framework.TestCase; + +public class MapJsonSerializerTest extends TestCase +{ + private MapJsonSerializer _serializer; + + protected void setUp() throws Exception + { + super.setUp(); + _serializer = new MapJsonSerializer(); + + } + + public void testSerializeDeserialize() + { + Map<String, Object> testMap = new HashMap<String, Object>(); + testMap.put("string", "Test String"); + testMap.put("integer", new Integer(10)); + testMap.put("long", new Long(Long.MAX_VALUE)); + testMap.put("boolean", Boolean.TRUE); + + String jsonString = _serializer.serialize(testMap); + Map<String, Object> deserializedMap = _serializer.deserialize(jsonString); + + assertEquals(deserializedMap, testMap); + } + +} diff --git a/qpid/java/build.deps b/qpid/java/build.deps index 8fc225e084..3336139075 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -50,6 +50,9 @@ felix-main=lib/org.apache.felix.main-2.0.5.jar felix.libs=${felix-main} +jackson-core=lib/jackson-core-asl-1.9.0.jar +jackson-mapper=lib/jackson-mapper-asl-1.9.0.jar + commons-configuration.libs = ${commons-beanutils-core} ${commons-digester} \ ${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration} @@ -60,7 +63,8 @@ amqp-1-0-client.libs=${commons-cli} amqp-1-0-client-jms.libs=${geronimo-jms} tools.libs=${commons-configuration.libs} ${log4j} broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \ - ${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs} + ${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs} \ + ${jackson-core} ${jackson-mapper} broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 85870e68c5..fdc71e31f9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -858,4 +858,9 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt } } + public static String toString(AMQShortString amqShortString) + { + return amqShortString == null ? null : amqShortString.asString(); + } + } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index 0668120b72..cbf6caf141 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -42,6 +42,7 @@ public class QpidTestCase extends TestCase { public static final String QPID_HOME = System.getProperty("QPID_HOME"); public static final String TEST_RESOURCES_DIR = QPID_HOME + "/../test-profiles/test_resources/"; + public static final String TMP_FOLDER = System.getProperty("java.io.tmpdir"); private static final Logger _logger = Logger.getLogger(QpidTestCase.class); diff --git a/qpid/java/lib/jackson-core-asl-1.9.0.jar b/qpid/java/lib/jackson-core-asl-1.9.0.jar Binary files differnew file mode 100644 index 0000000000..e9456ad887 --- /dev/null +++ b/qpid/java/lib/jackson-core-asl-1.9.0.jar diff --git a/qpid/java/lib/jackson-mapper-asl-1.9.0.jar b/qpid/java/lib/jackson-mapper-asl-1.9.0.jar Binary files differnew file mode 100644 index 0000000000..f8beebcfea --- /dev/null +++ b/qpid/java/lib/jackson-mapper-asl-1.9.0.jar diff --git a/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java b/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java index 6065a04576..eef6356642 100644 --- a/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java +++ b/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java @@ -24,6 +24,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueueMBean; @@ -36,6 +37,7 @@ import javax.management.MBeanFeatureInfo; import javax.management.MBeanInfo; import java.util.ArrayList; import java.util.List; +import java.util.UUID; /** @@ -86,7 +88,7 @@ public class ManagementConsoleTest extends InternalBrokerBaseCase // If this test fails due to changes in the broker code, // then the constants in the Constants.java shoule be updated accordingly DirectExchange exchange = new DirectExchange(); - exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); AMQManagedObject mbean = (AMQManagedObject)exchange.getManagedObject(); MBeanInfo mbeanInfo = mbean.getMBeanInfo(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 69cc64c0d0..f2d4a513be 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -26,6 +26,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; @@ -191,17 +192,17 @@ public class SlowMessageStore implements MessageStore doPostDelay("removeExchange"); } - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + public void bindQueue(Binding binding) throws AMQStoreException { doPreDelay("bindQueue"); - _durableConfigurationStore.bindQueue(exchange, routingKey, queue, args); + _durableConfigurationStore.bindQueue(binding); doPostDelay("bindQueue"); } - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + public void unbindQueue(Binding binding) throws AMQStoreException { doPreDelay("unbindQueue"); - _durableConfigurationStore.unbindQueue(exchange, routingKey, queue, args); + _durableConfigurationStore.unbindQueue(binding); doPostDelay("unbindQueue"); } diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 74b02153ac..0abe665206 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -91,6 +91,7 @@ org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval +org.apache.qpid.server.store.DurableConfigurationStoreTest#* // CPP Broker does not follow the same Logging convention as the Java broker org.apache.qpid.server.BrokerStartupTest#* diff --git a/qpid/java/test-profiles/JavaBDBExcludes b/qpid/java/test-profiles/JavaBDBExcludes index 3fac01cb6d..0750beb339 100644 --- a/qpid/java/test-profiles/JavaBDBExcludes +++ b/qpid/java/test-profiles/JavaBDBExcludes @@ -20,3 +20,4 @@ //This test is subclassed within the bdbstore module to enable it to run and //also add some bdb-specific tests. It is excluded to prevent running twice. org.apache.qpid.server.store.MessageStoreTest#* +org.apache.qpid.server.store.DurableConfigurationStoreTest#* diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index 168be7736c..eb4c1f814d 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -41,3 +41,5 @@ org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval org.apache.qpid.server.store.berkeleydb.* + +org.apache.qpid.server.store.DurableConfigurationStoreTest#*
\ No newline at end of file |