From 706370f431afa3e812b94878fd9bc6a09a0920d5 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 2 Jul 2013 15:26:42 +0000 Subject: QPID-4973 : [Java Broker] Refactor DurableConfigurationStore git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1498976 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 147 ++------------- .../berkeleydb/tuple/ConfiguredObjectBinding.java | 51 +++++- .../tuple/ConfiguredObjectBindingTest.java | 11 +- .../qpid/server/exchange/AbstractExchange.java | 5 +- .../server/exchange/DefaultExchangeRegistry.java | 3 +- .../qpid/server/exchange/ExchangeInitialiser.java | 6 +- .../server/handler/ExchangeDeclareHandler.java | 4 +- .../qpid/server/handler/QueueDeclareHandler.java | 5 +- .../qpid/server/handler/QueueDeleteHandler.java | 3 +- .../qpid/server/model/adapter/QueueAdapter.java | 6 +- .../server/model/adapter/VirtualHostAdapter.java | 8 +- .../apache/qpid/server/queue/AMQQueueFactory.java | 8 +- .../server/store/AbstractJDBCMessageStore.java | 177 ++++++++---------- .../server/store/ConfigurationRecoveryHandler.java | 21 +-- .../qpid/server/store/ConfiguredObjectHelper.java | 203 --------------------- .../qpid/server/store/ConfiguredObjectRecord.java | 18 +- .../server/store/DurableConfigurationStore.java | 67 ++----- .../store/DurableConfigurationStoreHelper.java | 132 ++++++++++++++ .../qpid/server/store/MemoryMessageStore.java | 2 + .../apache/qpid/server/store/NullMessageStore.java | 38 +--- .../server/transport/ServerSessionDelegate.java | 11 +- .../server/virtualhost/AbstractVirtualHost.java | 5 +- .../VirtualHostConfigRecoveryHandler.java | 152 ++++++++++++--- .../store/DurableConfigurationStoreTest.java | 146 ++++++++++----- .../apache/qpid/server/store/MessageStoreTest.java | 13 +- .../qpid/server/store/MessageStoreTestCase.java | 12 -- .../qpid/server/store/QuotaMessageStore.java | 3 +- .../apache/qpid/server/store/SlowMessageStore.java | 65 ++----- 28 files changed, 608 insertions(+), 714 deletions(-) delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index d7c8102c0e..f6b7e1790f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -51,16 +51,10 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; @@ -150,8 +144,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private final EventManager _eventManager = new EventManager(); private String _storeLocation; - private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); - private Map _envConfigMap; public AbstractBDBMessageStore() @@ -434,17 +426,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { try { - List configuredObjects = loadConfiguredObjects(); - ExchangeRecoveryHandler erh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverExchanges(erh, configuredObjects); - - QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - - BindingRecoveryHandler brh = qrh.completeQueueRecovery(); - _configuredObjectHelper.recoverBindings(brh, configuredObjects); + recoveryHandler.beginConfigurationRecovery(this); + loadConfiguredObjects(recoveryHandler); - brh.completeBindingRecovery(); + recoveryHandler.completeConfigurationRecovery(); } catch (DatabaseException e) { @@ -453,10 +438,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } - private List loadConfiguredObjects() throws DatabaseException + private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException { Cursor cursor = null; - List results = new ArrayList(); try { cursor = _configuredObjectsDb.openCursor(null, null); @@ -464,10 +448,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo DatabaseEntry value = new DatabaseEntry(); while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { - ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value); UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - configuredObject.setId(id); - results.add(configuredObject); + + ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value); + crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes()); } } @@ -475,7 +459,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { closeCursorSafely(cursor); } - return results; } private void closeCursorSafely(Cursor cursor) @@ -743,111 +726,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } - /** - * @see DurableConfigurationStore#createExchange(Exchange) - */ - public void createExchange(Exchange exchange) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); - storeConfiguredObjectEntry(configuredObject); - } - } - - /** - * @see DurableConfigurationStore#removeExchange(Exchange) - */ - public void removeExchange(Exchange exchange) throws AMQStoreException - { - UUID id = exchange.getId(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called"); - } - OperationStatus status = removeConfiguredObject(id); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found"); - } - } - - - /** - * @see DurableConfigurationStore#bindQueue(Binding) - */ - public void bindQueue(Binding binding) throws AMQStoreException + @Override + public void create(UUID id, String type, Map attributes) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes); storeConfiguredObjectEntry(configuredObject); } } - /** - * @see DurableConfigurationStore#unbindQueue(Binding) - */ - public void unbindQueue(Binding binding) - throws AMQStoreException + @Override + public void remove(UUID id, String type) throws AMQStoreException { - UUID id = binding.getId(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called"); + LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called"); } - OperationStatus status = removeConfiguredObject(id); if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Binding " + binding + " not found"); - } - } - - /** - * @see DurableConfigurationStore#createQueue(AMQQueue) - */ - public void createQueue(AMQQueue queue) throws AMQStoreException - { - createQueue(queue, null); - } - - /** - * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable) - */ - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId() - + ", arguments=" + arguments + "): called"); - } - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); - storeConfiguredObjectEntry(configuredObject); + throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found"); } } - /** - * Updates the specified queue in the persistent store, IF it is already present. If the queue - * is not present in the store, it will not be added. - * - * @param queue The queue to update the entry for. - * @throws AMQStoreException If the operation fails for any reason. - */ - public void updateQueue(final AMQQueue queue) throws AMQStoreException + @Override + public void update(UUID id, String type, Map attributes) throws AMQStoreException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating queue: " + queue.getName()); + LOGGER.debug("Updating " +type + ", id: " + id); } try { DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(queue.getId(), key); + keyBinding.objectToEntry(id, key); DatabaseEntry value = new DatabaseEntry(); DatabaseEntry newValue = new DatabaseEntry(); @@ -856,8 +771,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { - ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value); - ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord); + ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); // write the updated entry to the store configuredObjectBinding.objectToEntry(newQueueRecord, newValue); @@ -878,29 +792,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } - /** - * Removes the specified queue from the persistent store. - * - * @param queue The queue to remove. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - public void removeQueue(final AMQQueue queue) throws AMQStoreException - { - UUID id = queue.getId(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called"); - } - - OperationStatus status = removeConfiguredObject(id); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found"); - } - } - - /** * Places a message onto a specified queue, in a given transaction. * diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java index 945bcf1d28..31cafbe74d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java @@ -20,38 +20,75 @@ */ package org.apache.qpid.server.store.berkeleydb.tuple; +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.store.ConfiguredObjectRecord; import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; public class ConfiguredObjectBinding extends TupleBinding { - private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(); + private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(null); + private final UUID _uuid; + public static ConfiguredObjectBinding getInstance() { return INSTANCE; } - /** non-public constructor forces getInstance instead */ - private ConfiguredObjectBinding() + public ConfiguredObjectBinding(UUID uuid) { + _uuid = uuid; } public ConfiguredObjectRecord entryToObject(TupleInput tupleInput) { String type = tupleInput.readString(); String json = tupleInput.readString(); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(null, type, json); - return configuredObject; + ObjectMapper mapper = new ObjectMapper(); + try + { + Map value = mapper.readValue(json, Map.class); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(_uuid, type, value); + return configuredObject; + } + catch (IOException e) + { + //should never happen + throw new RuntimeException(e); + } + } public void objectToEntry(ConfiguredObjectRecord object, TupleOutput tupleOutput) { - tupleOutput.writeString(object.getType()); - tupleOutput.writeString(object.getAttributes()); + try + { + StringWriter writer = new StringWriter(); + new ObjectMapper().writeValue(writer, object.getAttributes()); + tupleOutput.writeString(object.getType()); + tupleOutput.writeString(writer.toString()); + } + catch (JsonMappingException e) + { + throw new RuntimeException(e); + } + catch (JsonGenerationException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); + } } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java index 7f93f5691e..5a5d39081c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.store.berkeleydb.tuple; +import java.util.Collections; +import java.util.Map; import junit.framework.TestCase; import org.apache.qpid.server.model.UUIDGenerator; @@ -33,7 +35,9 @@ public class ConfiguredObjectBindingTest extends TestCase private ConfiguredObjectRecord _object; - private static final String DUMMY_ATTRIBUTES_STRING = "dummyAttributes"; + private static final Map DUMMY_ATTRIBUTES_MAP = + Collections.singletonMap("dummy",(Object) "attributes"); + private static final String DUMMY_TYPE_STRING = "dummyType"; private ConfiguredObjectBinding _configuredObjectBinding; @@ -42,7 +46,8 @@ public class ConfiguredObjectBindingTest extends TestCase { super.setUp(); _configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING); + _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, + DUMMY_ATTRIBUTES_MAP); } public void testObjectToEntryAndEntryToObject() @@ -55,7 +60,7 @@ public class ConfiguredObjectBindingTest extends TestCase TupleInput tupleInput = new TupleInput(entryAsBytes); ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput); - assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_STRING, storedObject.getAttributes()); + assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_MAP, storedObject.getAttributes()); assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 6b453cbbda..3f6489cb86 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -554,7 +555,7 @@ public abstract class AbstractExchange implements Exchange if (b.isDurable()) { - _virtualHost.getDurableConfigurationStore().unbindQueue(b); + DurableConfigurationStoreHelper.removeBinding(_virtualHost.getDurableConfigurationStore(), b); } b.logDestruction(); } @@ -626,7 +627,7 @@ public abstract class AbstractExchange implements Exchange if (b.isDurable() && !restore) { - _virtualHost.getDurableConfigurationStore().bindQueue(b); + DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b); } queue.addQueueDeleteTask(b); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 142da84524..68c15779a0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -29,6 +29,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; @@ -127,7 +128,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry { if (e.isDurable()) { - getDurableConfigurationStore().removeExchange(e); + DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), e); } e.close(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java index 313b5eefff..10a733546c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java @@ -21,12 +21,10 @@ package org.apache.qpid.server.exchange; -import java.util.UUID; import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfigurationStore; public class ExchangeInitialiser @@ -49,7 +47,7 @@ public class ExchangeInitialiser r.registerExchange(exchange); if(exchange.isDurable()) { - store.createExchange(exchange); + DurableConfigurationStoreHelper.createExchange(store, exchange); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 9c25d00b1a..b3967689dc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.virtualhost.VirtualHost; public class ExchangeDeclareHandler implements StateAwareMethodListener @@ -106,7 +107,8 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener @@ -115,7 +116,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener configuredObjects = loadConfiguredObjects(); - - ExchangeRecoveryHandler erh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverExchanges(erh, configuredObjects); + recoveryHandler.beginConfigurationRecovery(this); + loadConfiguredObjects(recoveryHandler); - QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - - BindingRecoveryHandler brh = qrh.completeQueueRecovery(); - _configuredObjectHelper.recoverBindings(brh, configuredObjects); - - brh.completeBindingRecovery(); + recoveryHandler.completeConfigurationRecovery(); } catch (SQLException e) { @@ -573,92 +559,42 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - @Override - public void createExchange(Exchange exchange) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); - insertConfiguredObject(configuredObject); - } - - } - - @Override - public void removeExchange(Exchange exchange) throws AMQStoreException - { - int results = removeConfiguredObject(exchange.getId()); - if (results == 0) - { - throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found"); - } - } @Override - public void bindQueue(Binding binding) - throws AMQStoreException + public void create(UUID id, String type, Map attributes) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); - insertConfiguredObject(configuredObject); + insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes)); } - } - @Override - public void unbindQueue(Binding binding) - throws AMQStoreException - { - int results = removeConfiguredObject(binding.getId()); - if (results == 0) - { - throw new AMQStoreException("Binding " + binding + " not found"); - } - } - - @Override - public void createQueue(AMQQueue queue) throws AMQStoreException - { - createQueue(queue, null); } @Override - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException + public void remove(UUID id, String type) throws AMQStoreException { - getLogger().debug("public void createQueue(AMQQueue queue = " + queue + "): called"); - - if (_stateManager.isInState(State.ACTIVE)) + int results = removeConfiguredObject(id); + if (results == 0) { - ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); - insertConfiguredObject(queueConfiguredObject); + throw new AMQStoreException(type + " with id " + id + " not found"); } } - /** - * Updates the specified queue in the persistent store, IF it is already present. If the queue - * is not present in the store, it will not be added. - * - * NOTE: Currently only updates the exclusivity. - * - * @param queue The queue to update the entry for. - * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason. - */ @Override - public void updateQueue(final AMQQueue queue) throws AMQStoreException + public void update(UUID id, String type, Map attributes) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId()); + ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id); if (queueConfiguredObject != null) { - ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject); + ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); updateConfiguredObject(newQueueRecord); } } } - /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED * isolation and with auto-commit transactions enabled. @@ -714,18 +650,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected abstract Connection getConnection() throws SQLException; - @Override - public void removeQueue(final AMQQueue queue) throws AMQStoreException - { - AMQShortString name = queue.getNameShortString(); - getLogger().debug("public void removeQueue(AMQShortString name = " + name + "): called"); - int results = removeConfiguredObject(queue.getId()); - if (results == 0) - { - throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found"); - } - } - private byte[] convertStringMapToBytes(final Map arguments) throws AMQStoreException { byte[] argumentBytes; @@ -1825,7 +1749,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } else { - byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET); + final Map attributes = configuredObject.getAttributes(); + byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes); ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); } @@ -1852,6 +1777,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC conn.close(); } } + catch (JsonMappingException e) + { + throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } catch (SQLException e) { throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); @@ -1914,7 +1851,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC stmt2.setString(1, configuredObject.getType()); if (configuredObject.getAttributes() != null) { - byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET); + byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes( + configuredObject.getAttributes()); ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); stmt2.setBinaryStream(2, bis, attributesAsBytes.length); } @@ -1946,6 +1884,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC conn.close(); } } + catch (JsonMappingException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } catch (SQLException e) { throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); @@ -1972,7 +1922,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { String type = rs.getString(1); String attributes = getBlobAsString(rs, 2); - result = new ConfiguredObjectRecord(id, type, attributes); + result = new ConfiguredObjectRecord(id, type, + (new ObjectMapper()).readValue(attributes,Map.class)); } } finally @@ -1990,6 +1941,21 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC conn.close(); } } + catch (JsonMappingException e) + { + throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " + + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " + + e.getMessage(), e); + } + catch (IOException e) + { + throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " + + e.getMessage(), e); + } catch (SQLException e) { throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " @@ -1998,10 +1964,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC return result; } - private List loadConfiguredObjects() throws SQLException + private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, AMQStoreException { - ArrayList results = new ArrayList(); Connection conn = newAutoCommitConnection(); + + final ObjectMapper objectMapper = new ObjectMapper(); try { PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); @@ -2015,9 +1982,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC String id = rs.getString(1); String objectType = rs.getString(2); String attributes = getBlobAsString(rs, 3); - results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes)); + recoveryHandler.configuredObject(UUID.fromString(id), objectType, + objectMapper.readValue(attributes,Map.class)); } } + catch (JsonMappingException e) + { + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); + } finally { rs.close(); @@ -2032,7 +2012,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { conn.close(); } - return results; } protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index ab7ef3f55b..2a4aed5373 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -28,25 +28,10 @@ import java.util.UUID; public interface ConfigurationRecoveryHandler { - ExchangeRecoveryHandler begin(MessageStore store); + void beginConfigurationRecovery(DurableConfigurationStore store); - public static interface ExchangeRecoveryHandler - { - void exchange(UUID id, String exchangeName, String type, boolean autoDelete); - QueueRecoveryHandler completeExchangeRecovery(); - } + void configuredObject(UUID id, String type, Map attributes); - public static interface QueueRecoveryHandler - { - void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId); - BindingRecoveryHandler completeQueueRecovery(); - } - - - public static interface BindingRecoveryHandler - { - void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf); - void completeBindingRecovery(); - } + void completeConfigurationRecovery(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java deleted file mode 100644 index 7356e1ae83..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; -import org.apache.qpid.server.util.MapJsonSerializer; - -public class ConfiguredObjectHelper -{ - - - private MapJsonSerializer _serializer = new MapJsonSerializer(); - - public void loadQueue(ConfiguredObjectRecord configuredObject, QueueRecoveryHandler qrh) - { - if (Queue.class.getName().equals(configuredObject.getType())) - { - Map attributeMap = _serializer.deserialize(configuredObject.getAttributes()); - String queueName = (String) attributeMap.get(Queue.NAME); - String owner = (String) attributeMap.get(Queue.OWNER); - boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); - UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE)); - @SuppressWarnings("unchecked") - Map queueArgumentsMap = (Map) attributeMap.get(Queue.ARGUMENTS); - FieldTable arguments = null; - if (queueArgumentsMap != null) - { - arguments = FieldTable.convertToFieldTable(queueArgumentsMap); - } - qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments, alternateExchangeId); - } - } - - public ConfiguredObjectRecord updateQueueConfiguredObject(final AMQQueue queue, ConfiguredObjectRecord queueRecord) - { - Map attributesMap = _serializer.deserialize(queueRecord.getAttributes()); - attributesMap.put(Queue.NAME, queue.getName()); - attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); - if (queue.getAlternateExchange() != null) - { - attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); - } - else - { - attributesMap.remove(Queue.ALTERNATE_EXCHANGE); - } - if (attributesMap.containsKey(Queue.ARGUMENTS)) - { - // We wouldn't need this if createQueueConfiguredObject took only AMQQueue - Map currentArgs = (Map) attributesMap.get(Queue.ARGUMENTS); - currentArgs.putAll(queue.getArguments()); - } - else - { - attributesMap.put(Queue.ARGUMENTS, queue.getArguments()); - } - String newJson = _serializer.serialize(attributesMap); - ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(queue.getId(), queueRecord.getType(), newJson); - return newQueueRecord; - } - - public ConfiguredObjectRecord createQueueConfiguredObject(AMQQueue queue, FieldTable arguments) - { - Map attributesMap = new HashMap(); - attributesMap.put(Queue.NAME, queue.getName()); - attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner())); - attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); - if (queue.getAlternateExchange() != null) - { - attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); - } - // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments. - // It would also do away with the need for the if/then/else within updateQueueConfiguredObject - if (arguments != null) - { - attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments)); - } - String json = _serializer.serialize(attributesMap); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(queue.getId(), Queue.class.getName(), json); - return configuredObject; - } - - public void loadExchange(ConfiguredObjectRecord configuredObject, ExchangeRecoveryHandler erh) - { - if (Exchange.class.getName().equals(configuredObject.getType())) - { - Map attributeMap = _serializer.deserialize(configuredObject.getAttributes()); - String exchangeName = (String) attributeMap.get(Exchange.NAME); - String exchangeType = (String) attributeMap.get(Exchange.TYPE); - String lifeTimePolicy = (String) attributeMap.get(Exchange.LIFETIME_POLICY); - boolean autoDelete = lifeTimePolicy == null - || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; - erh.exchange(configuredObject.getId(), exchangeName, exchangeType, autoDelete); - } - } - - public ConfiguredObjectRecord createExchangeConfiguredObject(org.apache.qpid.server.exchange.Exchange exchange) - { - Map attributesMap = new HashMap(); - attributesMap.put(Exchange.NAME, exchange.getName()); - attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString())); - attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name() - : LifetimePolicy.PERMANENT.name()); - String json = _serializer.serialize(attributesMap); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(exchange.getId(), Exchange.class.getName(), json); - return configuredObject; - } - - public void loadQueueBinding(ConfiguredObjectRecord configuredObject, BindingRecoveryHandler brh) - { - if (Binding.class.getName().equals(configuredObject.getType())) - { - Map attributeMap = _serializer.deserialize(configuredObject.getAttributes()); - UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE)); - UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE)); - String bindingName = (String) attributeMap.get(Binding.NAME); - - @SuppressWarnings("unchecked") - Map bindingArgumentsMap = (Map) attributeMap.get(Binding.ARGUMENTS); - FieldTable arguments = null; - if (bindingArgumentsMap != null) - { - arguments = FieldTable.convertToFieldTable(bindingArgumentsMap); - } - ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes())); - - brh.binding(configuredObject.getId(), exchangeId, queueId, bindingName, argumentsBB); - } - } - - public ConfiguredObjectRecord createBindingConfiguredObject(org.apache.qpid.server.binding.Binding binding) - { - Map attributesMap = new HashMap(); - attributesMap.put(Binding.NAME, binding.getBindingKey()); - attributesMap.put(Binding.EXCHANGE, binding.getExchange().getId()); - attributesMap.put(Binding.QUEUE, binding.getQueue().getId()); - Map arguments = binding.getArguments(); - if (arguments != null) - { - attributesMap.put(Binding.ARGUMENTS, arguments); - } - String json = _serializer.serialize(attributesMap); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(binding.getId(), Binding.class.getName(), json); - return configuredObject; - } - - public void recoverQueues(QueueRecoveryHandler qrh, List configuredObjects) - { - for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) - { - loadQueue(configuredObjectRecord, qrh); - } - } - - public void recoverExchanges(ExchangeRecoveryHandler erh, List configuredObjects) - { - for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) - { - loadExchange(configuredObjectRecord, erh); - } - } - - public void recoverBindings(BindingRecoveryHandler brh, List configuredObjects) - { - for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) - { - loadQueueBinding(configuredObjectRecord, brh); - } - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java index 95e1713d78..5c8f452dc7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java @@ -20,20 +20,23 @@ */ package org.apache.qpid.server.store; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.UUID; public class ConfiguredObjectRecord { private UUID _id; private String _type; - private String _attributes; + private Map _attributes; - public ConfiguredObjectRecord(UUID id, String type, String attributes) + public ConfiguredObjectRecord(UUID id, String type, Map attributes) { super(); _id = id; _type = type; - _attributes = attributes; + _attributes = Collections.unmodifiableMap(new LinkedHashMap(attributes)); } public UUID getId() @@ -41,17 +44,12 @@ public class ConfiguredObjectRecord return _id; } - public void setId(UUID id) - { - _id = id; - } - public String getType() - { + { return _type; } - public String getAttributes() + public Map getAttributes() { return _attributes; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 7092655e95..fdde21ba89 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.store; +import java.util.Map; +import java.util.UUID; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; @@ -51,77 +53,42 @@ public interface DurableConfigurationStore void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, VirtualHost virtualHost) throws Exception; - /** - * Makes the specified exchange persistent. - * - * @param exchange The exchange to persist. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void createExchange(Exchange exchange) throws AMQStoreException; - /** - * Removes the specified persistent exchange. - * - * @param exchange The exchange to remove. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void removeExchange(Exchange exchange) throws AMQStoreException; - /** - * Store the queue binding. - * - * @param binding queue binding - * - * @throws AMQStoreException if the operation fails for any reason. - */ - void bindQueue(Binding binding) throws AMQStoreException; /** - * Removes queue binding + * Makes the specified object persistent. * - * @param binding queue binding to remove + * @param id The id of the object to persist. + * @param type The type of the object to persist + * @param attributes the attributes of the object to persist * * @throws AMQStoreException If the operation fails for any reason. */ - void unbindQueue(Binding binding) throws AMQStoreException; + void create(UUID id, String type, Map attributes) throws AMQStoreException; /** - * Makes the specified queue persistent. + * Removes the specified persistent configured object. * - * @param queue The queue to store. + * @param id The id of the object to remove. + * @param type The type of the object to remove * * @throws AMQStoreException If the operation fails for any reason. */ - void createQueue(AMQQueue queue) throws AMQStoreException; + void remove(UUID id, String type) throws AMQStoreException; - /** - * Makes the specified queue persistent. - * - * @param queue The queue to store. - * @param arguments The additional arguments to the binding - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException; /** - * Removes the specified queue from the persistent store. + * Updates the specified object in the persistent store, IF it is already present. If the object + * is not present in the store, it will not be added. * - * @param queue The queue to remove. + * @param id The id of the object to update. + * @param type The type of the object to update + * @param attributes the updated attributes * * @throws AMQStoreException If the operation fails for any reason. */ - void removeQueue(AMQQueue queue) throws AMQStoreException; + void update(UUID id, String type, Map attributes) throws AMQStoreException; - /** - * Updates the specified queue in the persistent store, IF it is already present. If the queue - * is not present in the store, it will not be added. - * - * @param queue The queue to update the entry for. - * @throws AMQStoreException If the operation fails for any reason. - */ - void updateQueue(AMQQueue queue) throws AMQStoreException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java new file mode 100644 index 0000000000..9fab29fea6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.AMQQueue; + +public class DurableConfigurationStoreHelper +{ + + public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException + { + Map attributesMap = new LinkedHashMap(); + attributesMap.put(Queue.NAME, queue.getName()); + attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner())); + attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); + if (queue.getAlternateExchange() != null) + { + attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); + } + else + { + attributesMap.remove(Queue.ALTERNATE_EXCHANGE); + } + if (attributesMap.containsKey(Queue.ARGUMENTS)) + { + // We wouldn't need this if createQueueConfiguredObject took only AMQQueue + Map currentArgs = (Map) attributesMap.get(Queue.ARGUMENTS); + currentArgs.putAll(queue.getArguments()); + } + else + { + attributesMap.put(Queue.ARGUMENTS, queue.getArguments()); + } + store.update(queue.getId(), Queue.class.getName(), attributesMap); + } + + public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments) + throws AMQStoreException + { + Map attributesMap = new HashMap(); + attributesMap.put(Queue.NAME, queue.getName()); + attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner())); + attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); + if (queue.getAlternateExchange() != null) + { + attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); + } + // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments. + // It would also do away with the need for the if/then/else within updateQueueConfiguredObject + if (arguments != null) + { + attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments)); + } + store.create(queue.getId(),Queue.class.getName(),attributesMap); + } + + public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException + { + store.remove(queue.getId(), Queue.class.getName()); + } + + public static void createExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange) + throws AMQStoreException + { + Map attributesMap = new HashMap(); + attributesMap.put(Exchange.NAME, exchange.getName()); + attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString())); + attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name() + : LifetimePolicy.PERMANENT.name()); + store.create(exchange.getId(), Exchange.class.getName(), attributesMap); + + } + + + public static void removeExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange) + throws AMQStoreException + { + store.remove(exchange.getId(),Exchange.class.getName()); + } + + public static void createBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding) + throws AMQStoreException + { + Map attributesMap = new HashMap(); + attributesMap.put(Binding.NAME, binding.getBindingKey()); + attributesMap.put(Binding.EXCHANGE, binding.getExchange().getId()); + attributesMap.put(Binding.QUEUE, binding.getQueue().getId()); + Map arguments = binding.getArguments(); + if (arguments != null) + { + attributesMap.put(Binding.ARGUMENTS, arguments); + } + store.create(binding.getId(), Binding.class.getName(), attributesMap); + } + + + public static void removeBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding) + throws AMQStoreException + { + store.remove(binding.getId(), Binding.class.getName()); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 9b2496f262..b7372828e1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.store; +import java.util.Map; +import java.util.UUID; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 3b57bbfa55..77cde80af9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -19,12 +19,9 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.Exchange; +import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.AMQQueue; public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore { @@ -36,42 +33,17 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override - public void createExchange(Exchange exchange) throws AMQStoreException + public void update(UUID id, String type, Map attributes) { } @Override - public void removeExchange(Exchange exchange) throws AMQStoreException + public void remove(UUID id, String type) { } @Override - public void bindQueue(Binding binding) throws AMQStoreException - { - } - - @Override - public void unbindQueue(Binding binding) throws AMQStoreException - { - } - - @Override - public void createQueue(AMQQueue queue) throws AMQStoreException - { - } - - @Override - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - } - - @Override - public void removeQueue(AMQQueue queue) throws AMQStoreException - { - } - - @Override - public void updateQueue(AMQQueue queue) throws AMQStoreException + public void create(UUID id, String type, Map attributes) { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 0a4bb79ed7..889fe7c5c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -763,7 +764,7 @@ public class ServerSessionDelegate extends SessionDelegate if (exchange.isDurable()) { DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - store.createExchange(exchange); + DurableConfigurationStoreHelper.createExchange(store, exchange); } exchangeRegistry.registerExchange(exchange); } @@ -918,7 +919,7 @@ public class ServerSessionDelegate extends SessionDelegate if (exchange.isDurable() && !exchange.isAutoDelete()) { DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - store.removeExchange(exchange); + DurableConfigurationStoreHelper.removeExchange(store, exchange); } } } @@ -1300,11 +1301,11 @@ public class ServerSessionDelegate extends SessionDelegate { ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue()); } - store.createQueue(queue, ftArgs); + DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs); } else { - store.createQueue(queue); + DurableConfigurationStoreHelper.createQueue(store, queue, null); } } queueRegistry.registerQueue(queue); @@ -1469,7 +1470,7 @@ public class ServerSessionDelegate extends SessionDelegate if (queue.isDurable() && !queue.isAutoDelete()) { DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - store.removeQueue(queue); + DurableConfigurationStoreHelper.removeQueue(store,queue); } } catch (AMQException e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index a704ca112b..e89fa8b545 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.txn.DtxRegistry; @@ -294,7 +295,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg if (newExchange.isDurable()) { - getDurableConfigurationStore().createExchange(newExchange); + DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), newExchange); } } } @@ -306,7 +307,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg if (queue.isDurable()) { - getDurableConfigurationStore().createQueue(queue); + DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null); } //get the exchange name (returns default exchange name if none was specified) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 640e6ff459..3c7e1395d1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.AMQMessage; @@ -42,11 +43,15 @@ import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; @@ -61,9 +66,6 @@ import org.apache.qpid.transport.util.Functions; import org.apache.qpid.util.ByteBufferInputStream; public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, - ConfigurationRecoveryHandler.QueueRecoveryHandler, - ConfigurationRecoveryHandler.ExchangeRecoveryHandler, - ConfigurationRecoveryHandler.BindingRecoveryHandler, MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, @@ -78,6 +80,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private final Map _recoveredMessages = new HashMap(); private final Map _unusedMessages = new HashMap(); + private final Map>> _configuredObjects = new HashMap>>(); + private MessageStoreLogSubject _logSubject; private MessageStore _store; @@ -86,12 +90,19 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa _virtualHost = virtualHost; } + @Override + public void beginConfigurationRecovery(DurableConfigurationStore store) + { + _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName()); + + CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START()); + } + public VirtualHostConfigRecoveryHandler begin(MessageStore store) { _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName()); _store = store; CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); - return this; } @@ -100,7 +111,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa try { AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName); - + if (q == null) { q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost, @@ -118,9 +129,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa q.setAlternateExchange(altExchange); } } - + CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true)); - + //Record that we have a queue for recovery _queueRecoveries.put(queueName, 0); } @@ -130,12 +141,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } } - @Override - public BindingRecoveryHandler completeQueueRecovery() - { - return this; - } - public void exchange(UUID id, String exchangeName, String type, boolean autoDelete) { try @@ -155,11 +160,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } } - public QueueRecoveryHandler completeExchangeRecovery() - { - return this; - } - public StoredMessageRecoveryHandler begin() { return this; @@ -250,7 +250,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), Long.toString(messageId))); - + } } @@ -275,9 +275,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(message != null) { final QueueEntry entry = queue.getMessageOnTheQueue(messageId); - + entry.acquire(); - + branch.dequeue(queue, message); branch.addPostTransactionAcion(new ServerTransaction.Action() @@ -348,8 +348,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } - @Override - public void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf) + private void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf) { try { @@ -399,14 +398,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public void completeBindingRecovery() - { - } - public void complete() { - - } public void queueEntry(final UUID queueId, long messageId) @@ -486,6 +479,107 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return this; } + @Override + public void configuredObject(UUID id, String type, Map attributes) + { + Map> typeMap = _configuredObjects.get(type); + if(typeMap == null) + { + typeMap = new HashMap>(); + _configuredObjects.put(type,typeMap); + } + typeMap.put(id, attributes); + } + + @Override + public void completeConfigurationRecovery() + { + Map> exchangeObjects = + _configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName()); + + if(exchangeObjects != null) + { + recoverExchanges(exchangeObjects); + } + + Map> queueObjects = + _configuredObjects.remove(org.apache.qpid.server.model.Queue.class.getName()); + + if(queueObjects != null) + { + recoverQueues(queueObjects); + } + + + Map> bindingObjects = + _configuredObjects.remove(Binding.class.getName()); + + if(bindingObjects != null) + { + recoverBindings(bindingObjects); + } + + + CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); + } + + private void recoverExchanges(Map> exchangeObjects) + { + for(Map.Entry> entry : exchangeObjects.entrySet()) + { + Map attributeMap = entry.getValue(); + String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME); + String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE); + String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY); + boolean autoDelete = lifeTimePolicy == null + || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; + exchange(entry.getKey(), exchangeName, exchangeType, autoDelete); + } + } + + private void recoverQueues(Map> queueObjects) + { + for(Map.Entry> entry : queueObjects.entrySet()) + { + Map attributeMap = entry.getValue(); + + String queueName = (String) attributeMap.get(Queue.NAME); + String owner = (String) attributeMap.get(Queue.OWNER); + boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); + UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE)); + @SuppressWarnings("unchecked") + Map queueArgumentsMap = (Map) attributeMap.get(Queue.ARGUMENTS); + FieldTable arguments = null; + if (queueArgumentsMap != null) + { + arguments = FieldTable.convertToFieldTable(queueArgumentsMap); + } + queue(entry.getKey(), queueName, owner, exclusive, arguments, alternateExchangeId); + } + } + + private void recoverBindings(Map> bindingObjects) + { + for(Map.Entry> entry : bindingObjects.entrySet()) + { + Map attributeMap = entry.getValue(); + UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE)); + UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE)); + String bindingName = (String) attributeMap.get(Binding.NAME); + + @SuppressWarnings("unchecked") + Map bindingArgumentsMap = (Map) attributeMap.get(Binding.ARGUMENTS); + FieldTable arguments = null; + if (bindingArgumentsMap != null) + { + arguments = FieldTable.convertToFieldTable(bindingArgumentsMap); + } + ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes())); + + binding(entry.getKey(), exchangeId, queueId, bindingName, argumentsBB); + } + } + private static class DummyMessage implements EnqueableMessage { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java index 57e1fcd15c..4a6b3f2cad 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -21,9 +21,8 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.isA; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -32,7 +31,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; import java.io.File; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -48,13 +46,12 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockStoredMessage; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.server.store.derby.DerbyMessageStore; @@ -71,9 +68,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase private VirtualHost _virtualHost; private ConfigurationRecoveryHandler _recoveryHandler; - private QueueRecoveryHandler _queueRecoveryHandler; - private ExchangeRecoveryHandler _exchangeRecoveryHandler; - private BindingRecoveryHandler _bindingRecoveryHandler; private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; private TransactionLogRecoveryHandler _logRecoveryHandler; @@ -101,9 +95,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase setTestSystemProperty("QPID_WORK", TMP_FOLDER); _configuration = mock(Configuration.class); _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - _queueRecoveryHandler = mock(QueueRecoveryHandler.class); - _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class); - _bindingRecoveryHandler = mock(BindingRecoveryHandler.class); _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); @@ -112,9 +103,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase _virtualHost = mock(VirtualHost.class); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); - when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler); - when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME)); @@ -140,21 +128,44 @@ public class DurableConfigurationStoreTest extends QpidTestCase public void testCreateExchange() throws Exception { Exchange exchange = createTestExchange(); - _configStore.createExchange(exchange); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); reopenStore(); - verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true); + verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()), + eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), + org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", + org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); + } + + private Map map(Object... vals) + { + Map map = new HashMap(); + boolean isValue = false; + String key = null; + for(Object obj : vals) + { + if(isValue) + { + map.put(key,obj); + } + else + { + key = (String) obj; + } + isValue = !isValue; + } + return map; } public void testRemoveExchange() throws Exception { Exchange exchange = createTestExchange(); - _configStore.createExchange(exchange); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); - _configStore.removeExchange(exchange); + DurableConfigurationStoreHelper.removeExchange(_configStore, exchange); reopenStore(); - verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean()); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap()); } public void testBindQueue() throws Exception @@ -162,13 +173,18 @@ public class DurableConfigurationStoreTest extends QpidTestCase AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); - _configStore.bindQueue(binding); + DurableConfigurationStoreHelper.createBinding(_configStore, binding); reopenStore(); - ByteBuffer argsAsBytes = ByteBuffer.wrap(_bindingArgs.getDataAsBytes()); + Map map = new HashMap(); + map.put(org.apache.qpid.server.model.Binding.EXCHANGE, _exchange.getId().toString()); + map.put(org.apache.qpid.server.model.Binding.QUEUE, queue.getId().toString()); + map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY); + map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs)); - verify(_bindingRecoveryHandler).binding(binding.getId(), _exchange.getId(), queue.getId(), ROUTING_KEY, argsAsBytes); + verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()), + eq(map)); } public void testUnbindQueue() throws Exception @@ -176,22 +192,27 @@ public class DurableConfigurationStoreTest extends QpidTestCase AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); - _configStore.bindQueue(binding); + DurableConfigurationStoreHelper.createBinding(_configStore, binding); - _configStore.unbindQueue(binding); + DurableConfigurationStoreHelper.removeBinding(_configStore, binding); reopenStore(); - verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(), - isA(ByteBuffer.class)); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), + eq(org.apache.qpid.server.model.Binding.class.getName()), + anyMap()); } public void testCreateQueueAMQQueue() throws Exception { AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - _configStore.createQueue(queue); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, null); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); } public void testCreateQueueAMQQueueFieldTable() throws Exception @@ -202,10 +223,19 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _configStore.createQueue(queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments, null); + + + Map queueAttributes = new HashMap(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception @@ -213,10 +243,17 @@ public class DurableConfigurationStoreTest extends QpidTestCase Exchange alternateExchange = createTestAlternateExchange(); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); - _configStore.createQueue(queue); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, alternateExchange.getId()); + + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); } private Exchange createTestAlternateExchange() @@ -235,14 +272,25 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _configStore.createQueue(queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); // update the queue to have exclusive=false queue = createTestQueue(getName(), getName() + "Owner", false); - _configStore.updateQueue(queue); + when(queue.getArguments()).thenReturn(attributes); + + DurableConfigurationStoreHelper.updateQueue(_configStore, queue); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, null); + + Map queueAttributes = new HashMap(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + } public void testUpdateQueueAlternateExchange() throws Exception @@ -253,15 +301,26 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _configStore.createQueue(queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); // update the queue to have exclusive=false Exchange alternateExchange = createTestAlternateExchange(); queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); - _configStore.updateQueue(queue); + when(queue.getArguments()).thenReturn(attributes); + + DurableConfigurationStoreHelper.updateQueue(_configStore, queue); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, alternateExchange.getId()); + + Map queueAttributes = new HashMap(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); } public void testRemoveQueue() throws Exception @@ -272,13 +331,14 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _configStore.createQueue(queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); // remove queue - _configStore.removeQueue(queue); + DurableConfigurationStoreHelper.removeQueue(_configStore,queue); reopenStore(); - verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(), - any(FieldTable.class), any(UUID.class)); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), + eq(org.apache.qpid.server.model.Queue.class.getName()), + anyMap()); } private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 5eea002365..af4bbd1731 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -346,7 +346,8 @@ public class MessageStoreTest extends QpidTestCase 1, queueRegistry.getQueues().size()); //test that removing the queue means it is not recovered next time - getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName)); + final AMQQueue queue = queueRegistry.getQueue(durableQueueName); + DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue); reloadVirtualHost(); @@ -399,7 +400,8 @@ public class MessageStoreTest extends QpidTestCase origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); //test that removing the exchange means it is not recovered next time - getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); + final Exchange exchange = exchangeRegistry.getExchange(directExchangeName); + DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange); reloadVirtualHost(); @@ -755,7 +757,9 @@ public class MessageStoreTest extends QpidTestCase if (queue.isDurable() && !queue.isAutoDelete()) { - getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments); + DurableConfigurationStoreHelper.createQueue(getVirtualHost().getDurableConfigurationStore(), + queue, + queueArguments); } } catch (AMQException e) @@ -799,7 +803,8 @@ public class MessageStoreTest extends QpidTestCase getVirtualHost().getExchangeRegistry().registerExchange(exchange); if (durable) { - getVirtualHost().getDurableConfigurationStore().createExchange(exchange); + DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(), + exchange); } } catch (AMQException e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index a73057ebc1..2d68e94fcd 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -26,18 +26,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.test.utils.QpidTestCase; public abstract class MessageStoreTestCase extends QpidTestCase { private ConfigurationRecoveryHandler _recoveryHandler; - private QueueRecoveryHandler _queueRecoveryHandler; - private ExchangeRecoveryHandler _exchangeRecoveryHandler; - private BindingRecoveryHandler _bindingRecoveryHandler; private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; private TransactionLogRecoveryHandler _logRecoveryHandler; @@ -52,9 +46,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase super.setUp(); _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - _queueRecoveryHandler = mock(QueueRecoveryHandler.class); - _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class); - _bindingRecoveryHandler = mock(BindingRecoveryHandler.class); _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); @@ -63,9 +54,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase _virtualHost = mock(VirtualHost.class); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); - when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler); - when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 69efb7e310..a1d3d1542e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -28,7 +28,8 @@ import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.model.VirtualHost; -public class QuotaMessageStore extends NullMessageStore +public class + QuotaMessageStore extends NullMessageStore { private final AtomicLong _messageId = new AtomicLong(1); private final AtomicBoolean _closed = new AtomicBoolean(false); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 76250e126a..50f2ed655f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -22,16 +22,13 @@ package org.apache.qpid.server.store; import java.util.Collections; import java.util.Map; +import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.AMQQueue; import java.nio.ByteBuffer; import java.util.HashMap; @@ -176,51 +173,28 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } - public void createExchange(Exchange exchange) throws AMQStoreException - { - doPreDelay("createExchange"); - _durableConfigurationStore.createExchange(exchange); - doPostDelay("createExchange"); - } - - public void removeExchange(Exchange exchange) throws AMQStoreException - { - doPreDelay("removeExchange"); - _durableConfigurationStore.removeExchange(exchange); - doPostDelay("removeExchange"); - } - - public void bindQueue(Binding binding) throws AMQStoreException - { - doPreDelay("bindQueue"); - _durableConfigurationStore.bindQueue(binding); - doPostDelay("bindQueue"); - } - - public void unbindQueue(Binding binding) throws AMQStoreException - { - doPreDelay("unbindQueue"); - _durableConfigurationStore.unbindQueue(binding); - doPostDelay("unbindQueue"); - } - - public void createQueue(AMQQueue queue) throws AMQStoreException + @Override + public void create(UUID id, String type, Map attributes) throws AMQStoreException { - createQueue(queue, null); + doPreDelay("create"); + _durableConfigurationStore.create(id, type, attributes); + doPostDelay("create"); } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException + @Override + public void remove(UUID id, String type) throws AMQStoreException { - doPreDelay("createQueue"); - _durableConfigurationStore.createQueue(queue, arguments); - doPostDelay("createQueue"); + doPreDelay("remove"); + _durableConfigurationStore.remove(id, type); + doPostDelay("remove"); } - public void removeQueue(AMQQueue queue) throws AMQStoreException + @Override + public void update(UUID id, String type, Map attributes) throws AMQStoreException { - doPreDelay("removeQueue"); - _durableConfigurationStore.removeQueue(queue); - doPostDelay("removeQueue"); + doPreDelay("update"); + _durableConfigurationStore.update(id, type, attributes); + doPostDelay("update"); } public Transaction newTransaction() @@ -314,13 +288,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } } - public void updateQueue(AMQQueue queue) throws AMQStoreException - { - doPreDelay("updateQueue"); - _durableConfigurationStore.updateQueue(queue); - doPostDelay("updateQueue"); - } - @Override public void activate() throws Exception { -- cgit v1.2.1