summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-02 15:26:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-02 15:26:42 +0000
commit706370f431afa3e812b94878fd9bc6a09a0920d5 (patch)
treefc3ab91d71d6a12f56de333eee696ea9d902c735
parent296e94464d89b8d9affa0e73ba2d015a475dc88d (diff)
downloadqpid-python-706370f431afa3e812b94878fd9bc6a09a0920d5.tar.gz
QPID-4973 : [Java Broker] Refactor DurableConfigurationStore
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1498976 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java147
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java51
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java177
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java203
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java18
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java67
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java132
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java5
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java152
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java146
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java12
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java65
28 files changed, 608 insertions, 714 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index d7c8102c0e..f6b7e1790f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -51,16 +51,10 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
@@ -150,8 +144,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private final EventManager _eventManager = new EventManager();
private String _storeLocation;
- private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
-
private Map<String, String> _envConfigMap;
public AbstractBDBMessageStore()
@@ -434,17 +426,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
try
{
- List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
-
- QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
- BindingRecoveryHandler brh = qrh.completeQueueRecovery();
- _configuredObjectHelper.recoverBindings(brh, configuredObjects);
+ recoveryHandler.beginConfigurationRecovery(this);
+ loadConfiguredObjects(recoveryHandler);
- brh.completeBindingRecovery();
+ recoveryHandler.completeConfigurationRecovery();
}
catch (DatabaseException e)
{
@@ -453,10 +438,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
- private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException
+ private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
{
Cursor cursor = null;
- List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
try
{
cursor = _configuredObjectsDb.openCursor(null, null);
@@ -464,10 +448,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value);
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
- configuredObject.setId(id);
- results.add(configuredObject);
+
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value);
+ crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes());
}
}
@@ -475,7 +459,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
closeCursorSafely(cursor);
}
- return results;
}
private void closeCursorSafely(Cursor cursor)
@@ -743,111 +726,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
}
- /**
- * @see DurableConfigurationStore#createExchange(Exchange)
- */
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
- storeConfiguredObjectEntry(configuredObject);
- }
- }
-
- /**
- * @see DurableConfigurationStore#removeExchange(Exchange)
- */
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- UUID id = exchange.getId();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called");
- }
- OperationStatus status = removeConfiguredObject(id);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found");
- }
- }
-
-
- /**
- * @see DurableConfigurationStore#bindQueue(Binding)
- */
- public void bindQueue(Binding binding) throws AMQStoreException
+ @Override
+ public void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes);
storeConfiguredObjectEntry(configuredObject);
}
}
- /**
- * @see DurableConfigurationStore#unbindQueue(Binding)
- */
- public void unbindQueue(Binding binding)
- throws AMQStoreException
+ @Override
+ public void remove(UUID id, String type) throws AMQStoreException
{
- UUID id = binding.getId();
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called");
+ LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called");
}
-
OperationStatus status = removeConfiguredObject(id);
if (status == OperationStatus.NOTFOUND)
{
- throw new AMQStoreException("Binding " + binding + " not found");
- }
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue)
- */
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
- */
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId()
- + ", arguments=" + arguments + "): called");
- }
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
- storeConfiguredObjectEntry(configuredObject);
+ throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found");
}
}
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ @Override
+ public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Updating queue: " + queue.getName());
+ LOGGER.debug("Updating " +type + ", id: " + id);
}
try
{
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(queue.getId(), key);
+ keyBinding.objectToEntry(id, key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
@@ -856,8 +771,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
- ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value);
- ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord);
+ ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
// write the updated entry to the store
configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
@@ -879,29 +793,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
/**
- * Removes the specified queue from the persistent store.
- *
- * @param queue The queue to remove.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- UUID id = queue.getId();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called");
- }
-
- OperationStatus status = removeConfiguredObject(id);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found");
- }
- }
-
-
- /**
* Places a message onto a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
index 945bcf1d28..31cafbe74d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
@@ -20,38 +20,75 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.UUID;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord>
{
- private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding();
+ private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(null);
+ private final UUID _uuid;
+
public static ConfiguredObjectBinding getInstance()
{
return INSTANCE;
}
- /** non-public constructor forces getInstance instead */
- private ConfiguredObjectBinding()
+ public ConfiguredObjectBinding(UUID uuid)
{
+ _uuid = uuid;
}
public ConfiguredObjectRecord entryToObject(TupleInput tupleInput)
{
String type = tupleInput.readString();
String json = tupleInput.readString();
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(null, type, json);
- return configuredObject;
+ ObjectMapper mapper = new ObjectMapper();
+ try
+ {
+ Map<String,Object> value = mapper.readValue(json, Map.class);
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(_uuid, type, value);
+ return configuredObject;
+ }
+ catch (IOException e)
+ {
+ //should never happen
+ throw new RuntimeException(e);
+ }
+
}
public void objectToEntry(ConfiguredObjectRecord object, TupleOutput tupleOutput)
{
- tupleOutput.writeString(object.getType());
- tupleOutput.writeString(object.getAttributes());
+ try
+ {
+ StringWriter writer = new StringWriter();
+ new ObjectMapper().writeValue(writer, object.getAttributes());
+ tupleOutput.writeString(object.getType());
+ tupleOutput.writeString(writer.toString());
+ }
+ catch (JsonMappingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
index 7f93f5691e..5a5d39081c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.util.Collections;
+import java.util.Map;
import junit.framework.TestCase;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -33,7 +35,9 @@ public class ConfiguredObjectBindingTest extends TestCase
private ConfiguredObjectRecord _object;
- private static final String DUMMY_ATTRIBUTES_STRING = "dummyAttributes";
+ private static final Map<String, Object> DUMMY_ATTRIBUTES_MAP =
+ Collections.singletonMap("dummy",(Object) "attributes");
+
private static final String DUMMY_TYPE_STRING = "dummyType";
private ConfiguredObjectBinding _configuredObjectBinding;
@@ -42,7 +46,8 @@ public class ConfiguredObjectBindingTest extends TestCase
{
super.setUp();
_configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING);
+ _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING,
+ DUMMY_ATTRIBUTES_MAP);
}
public void testObjectToEntryAndEntryToObject()
@@ -55,7 +60,7 @@ public class ConfiguredObjectBindingTest extends TestCase
TupleInput tupleInput = new TupleInput(entryAsBytes);
ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput);
- assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_STRING, storedObject.getAttributes());
+ assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_MAP, storedObject.getAttributes());
assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType());
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 6b453cbbda..3f6489cb86 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -554,7 +555,7 @@ public abstract class AbstractExchange implements Exchange
if (b.isDurable())
{
- _virtualHost.getDurableConfigurationStore().unbindQueue(b);
+ DurableConfigurationStoreHelper.removeBinding(_virtualHost.getDurableConfigurationStore(), b);
}
b.logDestruction();
}
@@ -626,7 +627,7 @@ public abstract class AbstractExchange implements Exchange
if (b.isDurable() && !restore)
{
- _virtualHost.getDurableConfigurationStore().bindQueue(b);
+ DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
}
queue.addQueueDeleteTask(b);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 142da84524..68c15779a0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -29,6 +29,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -127,7 +128,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
{
if (e.isDurable())
{
- getDurableConfigurationStore().removeExchange(e);
+ DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), e);
}
e.close();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
index 313b5eefff..10a733546c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
@@ -21,12 +21,10 @@
package org.apache.qpid.server.exchange;
-import java.util.UUID;
import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
public class ExchangeInitialiser
@@ -49,7 +47,7 @@ public class ExchangeInitialiser
r.registerExchange(exchange);
if(exchange.isDurable())
{
- store.createExchange(exchange);
+ DurableConfigurationStoreHelper.createExchange(store, exchange);
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 9c25d00b1a..b3967689dc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
@@ -106,7 +107,8 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
if (exchange.isDurable())
{
- virtualHost.getDurableConfigurationStore().createExchange(exchange);
+ DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(),
+ exchange);
}
}
catch(AMQUnknownExchangeType e)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 2234ee0354..521f27885f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -30,7 +30,6 @@ import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -40,10 +39,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Collections;
import java.util.Map;
import java.util.UUID;
@@ -114,7 +113,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queue.setAuthorizationHolder(protocolConnection);
if (queue.isDurable() && !queue.isAutoDelete())
{
- store.createQueue(queue, body.getArguments());
+ DurableConfigurationStoreHelper.createQueue(store, queue, body.getArguments());
}
if(body.getAutoDelete())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index b184ce7dfa..4b00c1a09b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
@@ -115,7 +116,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
if (queue.isDurable())
{
- store.removeQueue(queue);
+ DurableConfigurationStoreHelper.removeQueue(store, queue);
}
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 0f90df00e6..157b97cc07 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.MapValueConverter;
@@ -195,7 +196,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
_queue.delete();
if (_queue.isDurable())
{
- _queue.getVirtualHost().getDurableConfigurationStore().removeQueue(_queue);
+ DurableConfigurationStoreHelper.removeQueue(_queue.getVirtualHost().getDurableConfigurationStore(), _queue);
}
}
}
@@ -365,7 +366,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
{
try
{
- _queue.getVirtualHost().getDurableConfigurationStore().updateQueue(_queue);
+ DurableConfigurationStoreHelper.updateQueue(_queue.getVirtualHost().getDurableConfigurationStore(),
+ _queue);
}
catch (AMQStoreException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 68177dac62..74b826da91 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -75,6 +75,7 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -312,7 +313,8 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
_virtualHost.getExchangeRegistry().registerExchange(exchange);
if(durable)
{
- _virtualHost.getDurableConfigurationStore().createExchange(exchange);
+ DurableConfigurationStoreHelper.createExchange(_virtualHost.getDurableConfigurationStore(),
+ exchange);
}
synchronized (_exchangeAdapters)
{
@@ -434,7 +436,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
if(durable)
{
- _virtualHost.getDurableConfigurationStore().createQueue(queue, FieldTable.convertToFieldTable(attributes));
+ DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(),
+ queue,
+ FieldTable.convertToFieldTable(attributes));
}
synchronized (_queueAdapters)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index cc0e5ebe7a..f9e678525f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
@@ -315,7 +316,8 @@ public class AMQQueueFactory
exchangeRegistry.registerExchange(dlExchange);
//enter the dle in the persistent store
- virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+ DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(),
+ dlExchange);
}
}
@@ -335,7 +337,9 @@ public class AMQQueueFactory
dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
//enter the dlq in the persistent store
- virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+ DurableConfigurationStoreHelper.createQueue(virtualHost.getDurableConfigurationStore(),
+ dlQueue,
+ FieldTable.convertToFieldTable(args));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 758945d6a1..fdb36c9013 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -43,16 +43,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -164,9 +161,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
_stateManager = new StateManager(_eventManager);
}
- private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
-
-
@Override
public void configureConfigStore(String name,
ConfigurationRecoveryHandler configRecoveryHandler,
@@ -462,18 +456,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
try
{
- List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
-
- ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
+ recoveryHandler.beginConfigurationRecovery(this);
+ loadConfiguredObjects(recoveryHandler);
- QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
- BindingRecoveryHandler brh = qrh.completeQueueRecovery();
- _configuredObjectHelper.recoverBindings(brh, configuredObjects);
-
- brh.completeBindingRecovery();
+ recoveryHandler.completeConfigurationRecovery();
}
catch (SQLException e)
{
@@ -573,92 +559,42 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- @Override
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
- insertConfiguredObject(configuredObject);
- }
-
- }
-
- @Override
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- int results = removeConfiguredObject(exchange.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found");
- }
- }
@Override
- public void bindQueue(Binding binding)
- throws AMQStoreException
+ public void create(UUID id, String type, Map<String,Object> attributes) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
- insertConfiguredObject(configuredObject);
+ insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes));
}
- }
- @Override
- public void unbindQueue(Binding binding)
- throws AMQStoreException
- {
- int results = removeConfiguredObject(binding.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Binding " + binding + " not found");
- }
- }
-
- @Override
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
}
@Override
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ public void remove(UUID id, String type) throws AMQStoreException
{
- getLogger().debug("public void createQueue(AMQQueue queue = " + queue + "): called");
-
- if (_stateManager.isInState(State.ACTIVE))
+ int results = removeConfiguredObject(id);
+ if (results == 0)
{
- ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
- insertConfiguredObject(queueConfiguredObject);
+ throw new AMQStoreException(type + " with id " + id + " not found");
}
}
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * NOTE: Currently only updates the exclusivity.
- *
- * @param queue The queue to update the entry for.
- * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
- */
@Override
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId());
+ ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id);
if (queueConfiguredObject != null)
{
- ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject);
+ ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
updateConfiguredObject(newQueueRecord);
}
}
}
-
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions enabled.
@@ -714,18 +650,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract Connection getConnection() throws SQLException;
- @Override
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- AMQShortString name = queue.getNameShortString();
- getLogger().debug("public void removeQueue(AMQShortString name = " + name + "): called");
- int results = removeConfiguredObject(queue.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found");
- }
- }
-
private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
{
byte[] argumentBytes;
@@ -1825,7 +1749,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
else
{
- byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
@@ -1852,6 +1777,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
conn.close();
}
}
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
catch (SQLException e)
{
throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
@@ -1914,7 +1851,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
stmt2.setString(1, configuredObject.getType());
if (configuredObject.getAttributes() != null)
{
- byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+ configuredObject.getAttributes());
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
}
@@ -1946,6 +1884,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
conn.close();
}
}
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
catch (SQLException e)
{
throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
@@ -1972,7 +1922,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
String type = rs.getString(1);
String attributes = getBlobAsString(rs, 2);
- result = new ConfiguredObjectRecord(id, type, attributes);
+ result = new ConfiguredObjectRecord(id, type,
+ (new ObjectMapper()).readValue(attributes,Map.class));
}
}
finally
@@ -1990,6 +1941,21 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
conn.close();
}
}
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
catch (SQLException e)
{
throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
@@ -1998,10 +1964,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
return result;
}
- private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
+ private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, AMQStoreException
{
- ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
Connection conn = newAutoCommitConnection();
+
+ final ObjectMapper objectMapper = new ObjectMapper();
try
{
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
@@ -2015,9 +1982,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
String id = rs.getString(1);
String objectType = rs.getString(2);
String attributes = getBlobAsString(rs, 3);
- results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
+ recoveryHandler.configuredObject(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes,Map.class));
}
}
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
finally
{
rs.close();
@@ -2032,7 +2012,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
conn.close();
}
- return results;
}
protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
index ab7ef3f55b..2a4aed5373 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -28,25 +28,10 @@ import java.util.UUID;
public interface ConfigurationRecoveryHandler
{
- ExchangeRecoveryHandler begin(MessageStore store);
+ void beginConfigurationRecovery(DurableConfigurationStore store);
- public static interface ExchangeRecoveryHandler
- {
- void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
- QueueRecoveryHandler completeExchangeRecovery();
- }
+ void configuredObject(UUID id, String type, Map<String, Object> attributes);
- public static interface QueueRecoveryHandler
- {
- void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId);
- BindingRecoveryHandler completeQueueRecovery();
- }
-
-
- public static interface BindingRecoveryHandler
- {
- void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
- void completeBindingRecovery();
- }
+ void completeConfigurationRecovery();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
deleted file mode 100644
index 7356e1ae83..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
-import org.apache.qpid.server.util.MapJsonSerializer;
-
-public class ConfiguredObjectHelper
-{
-
-
- private MapJsonSerializer _serializer = new MapJsonSerializer();
-
- public void loadQueue(ConfiguredObjectRecord configuredObject, QueueRecoveryHandler qrh)
- {
- if (Queue.class.getName().equals(configuredObject.getType()))
- {
- Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
- String queueName = (String) attributeMap.get(Queue.NAME);
- String owner = (String) attributeMap.get(Queue.OWNER);
- boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
- UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
- @SuppressWarnings("unchecked")
- Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
- FieldTable arguments = null;
- if (queueArgumentsMap != null)
- {
- arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
- }
- qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments, alternateExchangeId);
- }
- }
-
- public ConfiguredObjectRecord updateQueueConfiguredObject(final AMQQueue queue, ConfiguredObjectRecord queueRecord)
- {
- Map<String, Object> attributesMap = _serializer.deserialize(queueRecord.getAttributes());
- attributesMap.put(Queue.NAME, queue.getName());
- attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
- if (queue.getAlternateExchange() != null)
- {
- attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
- }
- else
- {
- attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
- }
- if (attributesMap.containsKey(Queue.ARGUMENTS))
- {
- // We wouldn't need this if createQueueConfiguredObject took only AMQQueue
- Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS);
- currentArgs.putAll(queue.getArguments());
- }
- else
- {
- attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
- }
- String newJson = _serializer.serialize(attributesMap);
- ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(queue.getId(), queueRecord.getType(), newJson);
- return newQueueRecord;
- }
-
- public ConfiguredObjectRecord createQueueConfiguredObject(AMQQueue queue, FieldTable arguments)
- {
- Map<String, Object> attributesMap = new HashMap<String, Object>();
- attributesMap.put(Queue.NAME, queue.getName());
- attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
- attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
- if (queue.getAlternateExchange() != null)
- {
- attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
- }
- // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments.
- // It would also do away with the need for the if/then/else within updateQueueConfiguredObject
- if (arguments != null)
- {
- attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
- }
- String json = _serializer.serialize(attributesMap);
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(queue.getId(), Queue.class.getName(), json);
- return configuredObject;
- }
-
- public void loadExchange(ConfiguredObjectRecord configuredObject, ExchangeRecoveryHandler erh)
- {
- if (Exchange.class.getName().equals(configuredObject.getType()))
- {
- Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
- String exchangeName = (String) attributeMap.get(Exchange.NAME);
- String exchangeType = (String) attributeMap.get(Exchange.TYPE);
- String lifeTimePolicy = (String) attributeMap.get(Exchange.LIFETIME_POLICY);
- boolean autoDelete = lifeTimePolicy == null
- || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
- erh.exchange(configuredObject.getId(), exchangeName, exchangeType, autoDelete);
- }
- }
-
- public ConfiguredObjectRecord createExchangeConfiguredObject(org.apache.qpid.server.exchange.Exchange exchange)
- {
- Map<String, Object> attributesMap = new HashMap<String, Object>();
- attributesMap.put(Exchange.NAME, exchange.getName());
- attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString()));
- attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
- : LifetimePolicy.PERMANENT.name());
- String json = _serializer.serialize(attributesMap);
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(exchange.getId(), Exchange.class.getName(), json);
- return configuredObject;
- }
-
- public void loadQueueBinding(ConfiguredObjectRecord configuredObject, BindingRecoveryHandler brh)
- {
- if (Binding.class.getName().equals(configuredObject.getType()))
- {
- Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
- UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE));
- UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE));
- String bindingName = (String) attributeMap.get(Binding.NAME);
-
- @SuppressWarnings("unchecked")
- Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS);
- FieldTable arguments = null;
- if (bindingArgumentsMap != null)
- {
- arguments = FieldTable.convertToFieldTable(bindingArgumentsMap);
- }
- ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes()));
-
- brh.binding(configuredObject.getId(), exchangeId, queueId, bindingName, argumentsBB);
- }
- }
-
- public ConfiguredObjectRecord createBindingConfiguredObject(org.apache.qpid.server.binding.Binding binding)
- {
- Map<String, Object> attributesMap = new HashMap<String, Object>();
- attributesMap.put(Binding.NAME, binding.getBindingKey());
- attributesMap.put(Binding.EXCHANGE, binding.getExchange().getId());
- attributesMap.put(Binding.QUEUE, binding.getQueue().getId());
- Map<String, Object> arguments = binding.getArguments();
- if (arguments != null)
- {
- attributesMap.put(Binding.ARGUMENTS, arguments);
- }
- String json = _serializer.serialize(attributesMap);
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(binding.getId(), Binding.class.getName(), json);
- return configuredObject;
- }
-
- public void recoverQueues(QueueRecoveryHandler qrh, List<ConfiguredObjectRecord> configuredObjects)
- {
- for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
- {
- loadQueue(configuredObjectRecord, qrh);
- }
- }
-
- public void recoverExchanges(ExchangeRecoveryHandler erh, List<ConfiguredObjectRecord> configuredObjects)
- {
- for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
- {
- loadExchange(configuredObjectRecord, erh);
- }
- }
-
- public void recoverBindings(BindingRecoveryHandler brh, List<ConfiguredObjectRecord> configuredObjects)
- {
- for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
- {
- loadQueueBinding(configuredObjectRecord, brh);
- }
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
index 95e1713d78..5c8f452dc7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
@@ -20,20 +20,23 @@
*/
package org.apache.qpid.server.store;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.UUID;
public class ConfiguredObjectRecord
{
private UUID _id;
private String _type;
- private String _attributes;
+ private Map<String,Object> _attributes;
- public ConfiguredObjectRecord(UUID id, String type, String attributes)
+ public ConfiguredObjectRecord(UUID id, String type, Map<String,Object> attributes)
{
super();
_id = id;
_type = type;
- _attributes = attributes;
+ _attributes = Collections.unmodifiableMap(new LinkedHashMap<String,Object>(attributes));
}
public UUID getId()
@@ -41,17 +44,12 @@ public class ConfiguredObjectRecord
return _id;
}
- public void setId(UUID id)
- {
- _id = id;
- }
-
public String getType()
- {
+ {
return _type;
}
- public String getAttributes()
+ public Map<String,Object> getAttributes()
{
return _attributes;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 7092655e95..fdde21ba89 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store;
+import java.util.Map;
+import java.util.UUID;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQStoreException;
@@ -51,77 +53,42 @@ public interface DurableConfigurationStore
void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
VirtualHost virtualHost) throws Exception;
- /**
- * Makes the specified exchange persistent.
- *
- * @param exchange The exchange to persist.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void createExchange(Exchange exchange) throws AMQStoreException;
- /**
- * Removes the specified persistent exchange.
- *
- * @param exchange The exchange to remove.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void removeExchange(Exchange exchange) throws AMQStoreException;
- /**
- * Store the queue binding.
- *
- * @param binding queue binding
- *
- * @throws AMQStoreException if the operation fails for any reason.
- */
- void bindQueue(Binding binding) throws AMQStoreException;
/**
- * Removes queue binding
+ * Makes the specified object persistent.
*
- * @param binding queue binding to remove
+ * @param id The id of the object to persist.
+ * @param type The type of the object to persist
+ * @param attributes the attributes of the object to persist
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- void unbindQueue(Binding binding) throws AMQStoreException;
+ void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
/**
- * Makes the specified queue persistent.
+ * Removes the specified persistent configured object.
*
- * @param queue The queue to store.
+ * @param id The id of the object to remove.
+ * @param type The type of the object to remove
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- void createQueue(AMQQueue queue) throws AMQStoreException;
+ void remove(UUID id, String type) throws AMQStoreException;
- /**
- * Makes the specified queue persistent.
- *
- * @param queue The queue to store.
- * @param arguments The additional arguments to the binding
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException;
/**
- * Removes the specified queue from the persistent store.
+ * Updates the specified object in the persistent store, IF it is already present. If the object
+ * is not present in the store, it will not be added.
*
- * @param queue The queue to remove.
+ * @param id The id of the object to update.
+ * @param type The type of the object to update
+ * @param attributes the updated attributes
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- void removeQueue(AMQQueue queue) throws AMQStoreException;
+ void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void updateQueue(AMQQueue queue) throws AMQStoreException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
new file mode 100644
index 0000000000..9fab29fea6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class DurableConfigurationStoreHelper
+{
+
+ public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
+ {
+ Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
+ attributesMap.put(Queue.NAME, queue.getName());
+ attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
+ attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+ if (queue.getAlternateExchange() != null)
+ {
+ attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
+ }
+ else
+ {
+ attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
+ }
+ if (attributesMap.containsKey(Queue.ARGUMENTS))
+ {
+ // We wouldn't need this if createQueueConfiguredObject took only AMQQueue
+ Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS);
+ currentArgs.putAll(queue.getArguments());
+ }
+ else
+ {
+ attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
+ }
+ store.update(queue.getId(), Queue.class.getName(), attributesMap);
+ }
+
+ public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments)
+ throws AMQStoreException
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Queue.NAME, queue.getName());
+ attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
+ attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+ if (queue.getAlternateExchange() != null)
+ {
+ attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
+ }
+ // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments.
+ // It would also do away with the need for the if/then/else within updateQueueConfiguredObject
+ if (arguments != null)
+ {
+ attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
+ }
+ store.create(queue.getId(),Queue.class.getName(),attributesMap);
+ }
+
+ public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
+ {
+ store.remove(queue.getId(), Queue.class.getName());
+ }
+
+ public static void createExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
+ throws AMQStoreException
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Exchange.NAME, exchange.getName());
+ attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString()));
+ attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
+ : LifetimePolicy.PERMANENT.name());
+ store.create(exchange.getId(), Exchange.class.getName(), attributesMap);
+
+ }
+
+
+ public static void removeExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
+ throws AMQStoreException
+ {
+ store.remove(exchange.getId(),Exchange.class.getName());
+ }
+
+ public static void createBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
+ throws AMQStoreException
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Binding.NAME, binding.getBindingKey());
+ attributesMap.put(Binding.EXCHANGE, binding.getExchange().getId());
+ attributesMap.put(Binding.QUEUE, binding.getQueue().getId());
+ Map<String, Object> arguments = binding.getArguments();
+ if (arguments != null)
+ {
+ attributesMap.put(Binding.ARGUMENTS, arguments);
+ }
+ store.create(binding.getId(), Binding.class.getName(), attributesMap);
+ }
+
+
+ public static void removeBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
+ throws AMQStoreException
+ {
+ store.remove(binding.getId(), Binding.class.getName());
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 9b2496f262..b7372828e1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store;
+import java.util.Map;
+import java.util.UUID;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueableMessage;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 3b57bbfa55..77cde80af9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -19,12 +19,9 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
+import java.util.Map;
+import java.util.UUID;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -36,42 +33,17 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void createExchange(Exchange exchange) throws AMQStoreException
+ public void update(UUID id, String type, Map<String, Object> attributes)
{
}
@Override
- public void removeExchange(Exchange exchange) throws AMQStoreException
+ public void remove(UUID id, String type)
{
}
@Override
- public void bindQueue(Binding binding) throws AMQStoreException
- {
- }
-
- @Override
- public void unbindQueue(Binding binding) throws AMQStoreException
- {
- }
-
- @Override
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
- @Override
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- }
-
- @Override
- public void removeQueue(AMQQueue queue) throws AMQStoreException
- {
- }
-
- @Override
- public void updateQueue(AMQQueue queue) throws AMQStoreException
+ public void create(UUID id, String type, Map<String, Object> attributes)
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 0a4bb79ed7..889fe7c5c1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -763,7 +764,7 @@ public class ServerSessionDelegate extends SessionDelegate
if (exchange.isDurable())
{
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- store.createExchange(exchange);
+ DurableConfigurationStoreHelper.createExchange(store, exchange);
}
exchangeRegistry.registerExchange(exchange);
}
@@ -918,7 +919,7 @@ public class ServerSessionDelegate extends SessionDelegate
if (exchange.isDurable() && !exchange.isAutoDelete())
{
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- store.removeExchange(exchange);
+ DurableConfigurationStoreHelper.removeExchange(store, exchange);
}
}
}
@@ -1300,11 +1301,11 @@ public class ServerSessionDelegate extends SessionDelegate
{
ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
}
- store.createQueue(queue, ftArgs);
+ DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs);
}
else
{
- store.createQueue(queue);
+ DurableConfigurationStoreHelper.createQueue(store, queue, null);
}
}
queueRegistry.registerQueue(queue);
@@ -1469,7 +1470,7 @@ public class ServerSessionDelegate extends SessionDelegate
if (queue.isDurable() && !queue.isAutoDelete())
{
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- store.removeQueue(queue);
+ DurableConfigurationStoreHelper.removeQueue(store,queue);
}
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index a704ca112b..e89fa8b545 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -294,7 +295,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
if (newExchange.isDurable())
{
- getDurableConfigurationStore().createExchange(newExchange);
+ DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), newExchange);
}
}
}
@@ -306,7 +307,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
if (queue.isDurable())
{
- getDurableConfigurationStore().createQueue(queue);
+ DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null);
}
//get the exchange name (returns default exchange name if none was specified)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 640e6ff459..3c7e1395d1 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.AMQMessage;
@@ -42,11 +43,15 @@ import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
@@ -61,9 +66,6 @@ import org.apache.qpid.transport.util.Functions;
import org.apache.qpid.util.ByteBufferInputStream;
public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
- ConfigurationRecoveryHandler.QueueRecoveryHandler,
- ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
- ConfigurationRecoveryHandler.BindingRecoveryHandler,
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
TransactionLogRecoveryHandler,
@@ -78,6 +80,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
+ private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>();
+
private MessageStoreLogSubject _logSubject;
private MessageStore _store;
@@ -86,12 +90,19 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
_virtualHost = virtualHost;
}
+ @Override
+ public void beginConfigurationRecovery(DurableConfigurationStore store)
+ {
+ _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
+
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START());
+ }
+
public VirtualHostConfigRecoveryHandler begin(MessageStore store)
{
_logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
_store = store;
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
-
return this;
}
@@ -100,7 +111,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
try
{
AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
-
+
if (q == null)
{
q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
@@ -118,9 +129,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
q.setAlternateExchange(altExchange);
}
}
-
+
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
-
+
//Record that we have a queue for recovery
_queueRecoveries.put(queueName, 0);
}
@@ -130,12 +141,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
}
- @Override
- public BindingRecoveryHandler completeQueueRecovery()
- {
- return this;
- }
-
public void exchange(UUID id, String exchangeName, String type, boolean autoDelete)
{
try
@@ -155,11 +160,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
}
- public QueueRecoveryHandler completeExchangeRecovery()
- {
- return this;
- }
-
public StoredMessageRecoveryHandler begin()
{
return this;
@@ -250,7 +250,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
Long.toString(messageId)));
-
+
}
}
@@ -275,9 +275,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if(message != null)
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
-
+
entry.acquire();
-
+
branch.dequeue(queue, message);
branch.addPostTransactionAcion(new ServerTransaction.Action()
@@ -348,8 +348,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
- @Override
- public void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf)
+ private void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf)
{
try
{
@@ -399,14 +398,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
- public void completeBindingRecovery()
- {
- }
-
public void complete()
{
-
-
}
public void queueEntry(final UUID queueId, long messageId)
@@ -486,6 +479,107 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
return this;
}
+ @Override
+ public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+ {
+ Map<UUID, Map<String, Object>> typeMap = _configuredObjects.get(type);
+ if(typeMap == null)
+ {
+ typeMap = new HashMap<UUID, Map<String, Object>>();
+ _configuredObjects.put(type,typeMap);
+ }
+ typeMap.put(id, attributes);
+ }
+
+ @Override
+ public void completeConfigurationRecovery()
+ {
+ Map<UUID, Map<String, Object>> exchangeObjects =
+ _configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName());
+
+ if(exchangeObjects != null)
+ {
+ recoverExchanges(exchangeObjects);
+ }
+
+ Map<UUID, Map<String, Object>> queueObjects =
+ _configuredObjects.remove(org.apache.qpid.server.model.Queue.class.getName());
+
+ if(queueObjects != null)
+ {
+ recoverQueues(queueObjects);
+ }
+
+
+ Map<UUID, Map<String, Object>> bindingObjects =
+ _configuredObjects.remove(Binding.class.getName());
+
+ if(bindingObjects != null)
+ {
+ recoverBindings(bindingObjects);
+ }
+
+
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+ }
+
+ private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects)
+ {
+ for(Map.Entry<UUID, Map<String,Object>> entry : exchangeObjects.entrySet())
+ {
+ Map<String,Object> attributeMap = entry.getValue();
+ String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME);
+ String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
+ String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
+ boolean autoDelete = lifeTimePolicy == null
+ || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
+ exchange(entry.getKey(), exchangeName, exchangeType, autoDelete);
+ }
+ }
+
+ private void recoverQueues(Map<UUID, Map<String, Object>> queueObjects)
+ {
+ for(Map.Entry<UUID, Map<String,Object>> entry : queueObjects.entrySet())
+ {
+ Map<String,Object> attributeMap = entry.getValue();
+
+ String queueName = (String) attributeMap.get(Queue.NAME);
+ String owner = (String) attributeMap.get(Queue.OWNER);
+ boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
+ UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
+ FieldTable arguments = null;
+ if (queueArgumentsMap != null)
+ {
+ arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
+ }
+ queue(entry.getKey(), queueName, owner, exclusive, arguments, alternateExchangeId);
+ }
+ }
+
+ private void recoverBindings(Map<UUID, Map<String, Object>> bindingObjects)
+ {
+ for(Map.Entry<UUID, Map<String,Object>> entry : bindingObjects.entrySet())
+ {
+ Map<String,Object> attributeMap = entry.getValue();
+ UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE));
+ UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE));
+ String bindingName = (String) attributeMap.get(Binding.NAME);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS);
+ FieldTable arguments = null;
+ if (bindingArgumentsMap != null)
+ {
+ arguments = FieldTable.convertToFieldTable(bindingArgumentsMap);
+ }
+ ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes()));
+
+ binding(entry.getKey(), exchangeId, queueId, bindingName, argumentsBB);
+ }
+ }
+
private static class DummyMessage implements EnqueableMessage
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
index 57e1fcd15c..4a6b3f2cad 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
@@ -21,9 +21,8 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -32,7 +31,6 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.File;
-import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -48,13 +46,12 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockStoredMessage;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.server.store.derby.DerbyMessageStore;
@@ -71,9 +68,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase
private VirtualHost _virtualHost;
private ConfigurationRecoveryHandler _recoveryHandler;
- private QueueRecoveryHandler _queueRecoveryHandler;
- private ExchangeRecoveryHandler _exchangeRecoveryHandler;
- private BindingRecoveryHandler _bindingRecoveryHandler;
private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
private TransactionLogRecoveryHandler _logRecoveryHandler;
@@ -101,9 +95,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase
setTestSystemProperty("QPID_WORK", TMP_FOLDER);
_configuration = mock(Configuration.class);
_recoveryHandler = mock(ConfigurationRecoveryHandler.class);
- _queueRecoveryHandler = mock(QueueRecoveryHandler.class);
- _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class);
- _bindingRecoveryHandler = mock(BindingRecoveryHandler.class);
_storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
_logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
_messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
@@ -112,9 +103,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase
_virtualHost = mock(VirtualHost.class);
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
- when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler);
- when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler);
- when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME));
@@ -140,21 +128,44 @@ public class DurableConfigurationStoreTest extends QpidTestCase
public void testCreateExchange() throws Exception
{
Exchange exchange = createTestExchange();
- _configStore.createExchange(exchange);
+ DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
reopenStore();
- verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true);
+ verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()),
+ eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
+ org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
+ org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString())));
+ }
+
+ private Map<String,Object> map(Object... vals)
+ {
+ Map<String,Object> map = new HashMap<String, Object>();
+ boolean isValue = false;
+ String key = null;
+ for(Object obj : vals)
+ {
+ if(isValue)
+ {
+ map.put(key,obj);
+ }
+ else
+ {
+ key = (String) obj;
+ }
+ isValue = !isValue;
+ }
+ return map;
}
public void testRemoveExchange() throws Exception
{
Exchange exchange = createTestExchange();
- _configStore.createExchange(exchange);
+ DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
- _configStore.removeExchange(exchange);
+ DurableConfigurationStoreHelper.removeExchange(_configStore, exchange);
reopenStore();
- verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean());
+ verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap());
}
public void testBindQueue() throws Exception
@@ -162,13 +173,18 @@ public class DurableConfigurationStoreTest extends QpidTestCase
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
- _configStore.bindQueue(binding);
+ DurableConfigurationStoreHelper.createBinding(_configStore, binding);
reopenStore();
- ByteBuffer argsAsBytes = ByteBuffer.wrap(_bindingArgs.getDataAsBytes());
+ Map<String,Object> map = new HashMap<String, Object>();
+ map.put(org.apache.qpid.server.model.Binding.EXCHANGE, _exchange.getId().toString());
+ map.put(org.apache.qpid.server.model.Binding.QUEUE, queue.getId().toString());
+ map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY);
+ map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs));
- verify(_bindingRecoveryHandler).binding(binding.getId(), _exchange.getId(), queue.getId(), ROUTING_KEY, argsAsBytes);
+ verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()),
+ eq(map));
}
public void testUnbindQueue() throws Exception
@@ -176,22 +192,27 @@ public class DurableConfigurationStoreTest extends QpidTestCase
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
- _configStore.bindQueue(binding);
+ DurableConfigurationStoreHelper.createBinding(_configStore, binding);
- _configStore.unbindQueue(binding);
+ DurableConfigurationStoreHelper.removeBinding(_configStore, binding);
reopenStore();
- verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(),
- isA(ByteBuffer.class));
+ verify(_recoveryHandler, never()).configuredObject(any(UUID.class),
+ eq(org.apache.qpid.server.model.Binding.class.getName()),
+ anyMap());
}
public void testCreateQueueAMQQueue() throws Exception
{
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
- _configStore.createQueue(queue);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue, null);
reopenStore();
- verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, null);
+ Map<String, Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.NAME, getName());
+ queueAttributes.put(Queue.OWNER, getName()+"Owner");
+ queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -202,10 +223,19 @@ public class DurableConfigurationStoreTest extends QpidTestCase
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- _configStore.createQueue(queue, arguments);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
reopenStore();
- verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments, null);
+
+
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+
+ queueAttributes.put(Queue.NAME, getName());
+ queueAttributes.put(Queue.OWNER, getName()+"Owner");
+ queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ queueAttributes.put(Queue.ARGUMENTS, attributes);
+
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -213,10 +243,17 @@ public class DurableConfigurationStoreTest extends QpidTestCase
Exchange alternateExchange = createTestAlternateExchange();
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange);
- _configStore.createQueue(queue);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue, null);
reopenStore();
- verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, alternateExchange.getId());
+
+ Map<String, Object> queueAttributes = new HashMap<String, Object>();
+ queueAttributes.put(Queue.NAME, getName());
+ queueAttributes.put(Queue.OWNER, getName()+"Owner");
+ queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
+
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
}
private Exchange createTestAlternateExchange()
@@ -235,14 +272,25 @@ public class DurableConfigurationStoreTest extends QpidTestCase
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- _configStore.createQueue(queue, arguments);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
// update the queue to have exclusive=false
queue = createTestQueue(getName(), getName() + "Owner", false);
- _configStore.updateQueue(queue);
+ when(queue.getArguments()).thenReturn(attributes);
+
+ DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
reopenStore();
- verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, null);
+
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+
+ queueAttributes.put(Queue.NAME, getName());
+ queueAttributes.put(Queue.OWNER, getName()+"Owner");
+ queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
+ queueAttributes.put(Queue.ARGUMENTS, attributes);
+
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+
}
public void testUpdateQueueAlternateExchange() throws Exception
@@ -253,15 +301,26 @@ public class DurableConfigurationStoreTest extends QpidTestCase
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- _configStore.createQueue(queue, arguments);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
// update the queue to have exclusive=false
Exchange alternateExchange = createTestAlternateExchange();
queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange);
- _configStore.updateQueue(queue);
+ when(queue.getArguments()).thenReturn(attributes);
+
+ DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
reopenStore();
- verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, alternateExchange.getId());
+
+ Map<String,Object> queueAttributes = new HashMap<String, Object>();
+
+ queueAttributes.put(Queue.NAME, getName());
+ queueAttributes.put(Queue.OWNER, getName()+"Owner");
+ queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
+ queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
+
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
}
public void testRemoveQueue() throws Exception
@@ -272,13 +331,14 @@ public class DurableConfigurationStoreTest extends QpidTestCase
attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- _configStore.createQueue(queue, arguments);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
// remove queue
- _configStore.removeQueue(queue);
+ DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
reopenStore();
- verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(),
- any(FieldTable.class), any(UUID.class));
+ verify(_recoveryHandler, never()).configuredObject(any(UUID.class),
+ eq(org.apache.qpid.server.model.Queue.class.getName()),
+ anyMap());
}
private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 5eea002365..af4bbd1731 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -346,7 +346,8 @@ public class MessageStoreTest extends QpidTestCase
1, queueRegistry.getQueues().size());
//test that removing the queue means it is not recovered next time
- getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName));
+ final AMQQueue queue = queueRegistry.getQueue(durableQueueName);
+ DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue);
reloadVirtualHost();
@@ -399,7 +400,8 @@ public class MessageStoreTest extends QpidTestCase
origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
//test that removing the exchange means it is not recovered next time
- getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName));
+ final Exchange exchange = exchangeRegistry.getExchange(directExchangeName);
+ DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange);
reloadVirtualHost();
@@ -755,7 +757,9 @@ public class MessageStoreTest extends QpidTestCase
if (queue.isDurable() && !queue.isAutoDelete())
{
- getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments);
+ DurableConfigurationStoreHelper.createQueue(getVirtualHost().getDurableConfigurationStore(),
+ queue,
+ queueArguments);
}
}
catch (AMQException e)
@@ -799,7 +803,8 @@ public class MessageStoreTest extends QpidTestCase
getVirtualHost().getExchangeRegistry().registerExchange(exchange);
if (durable)
{
- getVirtualHost().getDurableConfigurationStore().createExchange(exchange);
+ DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(),
+ exchange);
}
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index a73057ebc1..2d68e94fcd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -26,18 +26,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.test.utils.QpidTestCase;
public abstract class MessageStoreTestCase extends QpidTestCase
{
private ConfigurationRecoveryHandler _recoveryHandler;
- private QueueRecoveryHandler _queueRecoveryHandler;
- private ExchangeRecoveryHandler _exchangeRecoveryHandler;
- private BindingRecoveryHandler _bindingRecoveryHandler;
private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
private TransactionLogRecoveryHandler _logRecoveryHandler;
@@ -52,9 +46,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase
super.setUp();
_recoveryHandler = mock(ConfigurationRecoveryHandler.class);
- _queueRecoveryHandler = mock(QueueRecoveryHandler.class);
- _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class);
- _bindingRecoveryHandler = mock(BindingRecoveryHandler.class);
_storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
_logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
_messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
@@ -63,9 +54,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_virtualHost = mock(VirtualHost.class);
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
- when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler);
- when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler);
- when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 69efb7e310..a1d3d1542e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -28,7 +28,8 @@ import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.model.VirtualHost;
-public class QuotaMessageStore extends NullMessageStore
+public class
+ QuotaMessageStore extends NullMessageStore
{
private final AtomicLong _messageId = new AtomicLong(1);
private final AtomicBoolean _closed = new AtomicBoolean(false);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 76250e126a..50f2ed655f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -22,16 +22,13 @@ package org.apache.qpid.server.store;
import java.util.Collections;
import java.util.Map;
+import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -176,51 +173,28 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- doPreDelay("createExchange");
- _durableConfigurationStore.createExchange(exchange);
- doPostDelay("createExchange");
- }
-
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- doPreDelay("removeExchange");
- _durableConfigurationStore.removeExchange(exchange);
- doPostDelay("removeExchange");
- }
-
- public void bindQueue(Binding binding) throws AMQStoreException
- {
- doPreDelay("bindQueue");
- _durableConfigurationStore.bindQueue(binding);
- doPostDelay("bindQueue");
- }
-
- public void unbindQueue(Binding binding) throws AMQStoreException
- {
- doPreDelay("unbindQueue");
- _durableConfigurationStore.unbindQueue(binding);
- doPostDelay("unbindQueue");
- }
-
- public void createQueue(AMQQueue queue) throws AMQStoreException
+ @Override
+ public void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
- createQueue(queue, null);
+ doPreDelay("create");
+ _durableConfigurationStore.create(id, type, attributes);
+ doPostDelay("create");
}
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ @Override
+ public void remove(UUID id, String type) throws AMQStoreException
{
- doPreDelay("createQueue");
- _durableConfigurationStore.createQueue(queue, arguments);
- doPostDelay("createQueue");
+ doPreDelay("remove");
+ _durableConfigurationStore.remove(id, type);
+ doPostDelay("remove");
}
- public void removeQueue(AMQQueue queue) throws AMQStoreException
+ @Override
+ public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
- doPreDelay("removeQueue");
- _durableConfigurationStore.removeQueue(queue);
- doPostDelay("removeQueue");
+ doPreDelay("update");
+ _durableConfigurationStore.update(id, type, attributes);
+ doPostDelay("update");
}
public Transaction newTransaction()
@@ -314,13 +288,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
}
- public void updateQueue(AMQQueue queue) throws AMQStoreException
- {
- doPreDelay("updateQueue");
- _durableConfigurationStore.updateQueue(queue);
- doPostDelay("updateQueue");
- }
-
@Override
public void activate() throws Exception
{