summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-12 17:23:22 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-12 17:23:22 +0000
commit80037617aeadef0a693a51b5ee784f299fb9008c (patch)
tree59c7cf31134c09f2e39f0e03b8cb46b7003d7f18
parent64eb3a8c90b2367b334a33217204714deb95fef0 (diff)
downloadqpid-python-80037617aeadef0a693a51b5ee784f299fb9008c.tar.gz
QPID-5625 : [Java Broker] ensure common configured object values are saved to the store
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1576826 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java34
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java22
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java43
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java3
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java47
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java62
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java40
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java9
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java8
15 files changed, 219 insertions, 96 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 37fb77f547..7d7ebd32fe 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -888,11 +888,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
update(false, id, type, attributes, null);
}
- public void update(ConfiguredObjectRecord... records) throws StoreException
- {
- update(false, records);
- }
-
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
index a408732e2f..bc3beeb78b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.UUID;
+
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import com.sleepycat.bind.tuple.TupleBinding;
@@ -31,14 +33,42 @@ import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.store.StoreException;
import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.Module;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord>
{
private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(null);
+
private final UUID _uuid;
+ private static final Module _module;
+ static
+ {
+ SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null));
+
+ final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>()
+ {
+ @Override
+ public void serialize(final ConfiguredObject value,
+ final JsonGenerator jgen,
+ final SerializerProvider provider)
+ throws IOException, JsonProcessingException
+ {
+ jgen.writeString(value.getId().toString());
+ }
+ };
+ module.addSerializer(ConfiguredObject.class, serializer);
+
+ _module = module;
+ }
public static ConfiguredObjectBinding getInstance()
{
@@ -74,7 +104,9 @@ public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord
try
{
StringWriter writer = new StringWriter();
- new ObjectMapper().writeValue(writer, object.getAttributes());
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ objectMapper.writeValue(writer, object.getAttributes());
tupleOutput.writeString(object.getType());
tupleOutput.writeString(writer.toString());
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index e34646d66f..a30806d810 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -55,6 +55,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -582,7 +583,13 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
final AMQQueue queue,
final Map<String, Object> arguments)
{
- return makeBinding(getBinding(bindingKey,queue).getId(), bindingKey, queue, arguments, false, true);
+ final BindingImpl existingBinding = getBinding(bindingKey, queue);
+ return makeBinding(existingBinding == null ? null : existingBinding.getId(),
+ bindingKey,
+ queue,
+ arguments,
+ false,
+ true);
}
@Override
@@ -642,8 +649,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
boolean restore,
boolean force)
{
- assert queue != null;
-
if (bindingKey == null)
{
bindingKey = "";
@@ -660,7 +665,16 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
bindingKey,
_virtualHost.getName());
}
- BindingImpl b = new BindingImpl(id, bindingKey, queue, this, arguments);
+
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(org.apache.qpid.server.model.Binding.NAME,bindingKey);
+ if(arguments != null)
+ {
+ attributes.put(org.apache.qpid.server.model.Binding.ARGUMENTS, arguments);
+ }
+
+ BindingImpl b = new BindingImpl(id, attributes, queue, this);
+
BindingImpl existingMapping = _bindingsMap.putIfAbsent(new BindingIdentifier(bindingKey,queue), b);
if (existingMapping == null || force)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
index eda61f92b0..ec48aa7936 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
@@ -84,8 +84,18 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private final Map<String, Object> _defaultAttributes = new HashMap<String, Object>();
private final TaskExecutor _taskExecutor;
- private final long _createdTime;
- private final String _createdBy;
+
+ @ManagedAttributeField
+ private long _createdTime;
+
+ @ManagedAttributeField
+ private String _createdBy;
+
+ @ManagedAttributeField
+ private long _lastUpdatedTime;
+
+ @ManagedAttributeField
+ private String _lastUpdatedBy;
@ManagedAttributeField
private String _name;
@@ -197,8 +207,18 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
}
- _createdTime = MapValueConverter.getLongAttribute(CREATED_TIME, attributes, System.currentTimeMillis());
- _createdBy = MapValueConverter.getStringAttribute(CREATED_BY, attributes, getCurrentUserName());
+ if(!_attributes.containsKey(CREATED_BY))
+ {
+ final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
+ if(currentUser != null)
+ {
+ _attributes.put(CREATED_BY, currentUser);
+ }
+ }
+ if(!_attributes.containsKey(CREATED_TIME))
+ {
+ _attributes.put(CREATED_TIME, System.currentTimeMillis());
+ }
for(Attribute<?,?> attr : _attributeTypes.values())
{
if(attr.getAnnotation().mandatory() && !(attributes.containsKey(attr.getName())|| defaults.containsKey(attr.getName())))
@@ -334,6 +354,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
protected void attributeSet(String attributeName, Object oldAttributeValue, Object newAttributeValue)
{
+
+ final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
+ if(currentUser != null)
+ {
+ _attributes.put(LAST_UPDATED_BY, currentUser.getName());
+ _lastUpdatedBy = currentUser.getName();
+ }
+ final long currentTime = System.currentTimeMillis();
+ _attributes.put(LAST_UPDATED_TIME, currentTime);
+ _lastUpdatedTime = currentTime;
+
synchronized (_changeListeners)
{
List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
@@ -659,13 +690,13 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public String getLastUpdatedBy()
{
- return null;
+ return _lastUpdatedBy;
}
@Override
public long getLastUpdatedTime()
{
- return 0;
+ return _lastUpdatedTime;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java
index 06ca0d2a24..12de8142da 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java
@@ -91,11 +91,6 @@ public class KeyStoreAdapter extends AbstractKeyStoreAdapter<KeyStoreAdapter> im
_certificateAlias, _keyManagerFactoryAlgorithm);
}
- @Override
- public String getDescription()
- {
- return null;
- }
@Override
public Collection<String> getAttributeNames()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index ab3da6224e..e5f587ea94 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -2871,7 +2871,8 @@ public abstract class AbstractQueue
if(childClass == Binding.class && otherParents.length == 1 && otherParents[0] instanceof Exchange)
{
final String bindingKey = (String) attributes.get("name");
- ((ExchangeImpl)otherParents[0]).addBinding(bindingKey, this, attributes);
+ ((ExchangeImpl)otherParents[0]).addBinding(bindingKey, this,
+ (Map<String,Object>) attributes.get(Binding.ARGUMENTS));
for(Binding binding : _bindings)
{
if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey))
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index dca3576827..77886e9030 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.ObjectType;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.access.OperationLoggingDetails;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.TaskPrincipal;
import javax.security.auth.Subject;
@@ -64,6 +65,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class SecurityManager implements ConfigurationChangeListener
@@ -183,6 +185,29 @@ public class SecurityManager implements ConfigurationChangeListener
return !(subject == null || subject.getPrincipals(SystemPrincipal.class).isEmpty());
}
+ public static AuthenticatedPrincipal getCurrentUser()
+ {
+ Subject subject = Subject.getSubject(AccessController.getContext());
+ final AuthenticatedPrincipal user;
+ if(subject != null)
+ {
+ Set<AuthenticatedPrincipal> principals = subject.getPrincipals(AuthenticatedPrincipal.class);
+ if(principals != null && !principals.isEmpty())
+ {
+ user = principals.iterator().next();
+ }
+ else
+ {
+ user = null;
+ }
+ }
+ else
+ {
+ user = null;
+ }
+ return user;
+ }
+
private static final class SystemPrincipal implements Principal
{
private SystemPrincipal()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 740d57f603..81c74fe8f5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -44,13 +44,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.Module;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -160,6 +168,28 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ private static final Module _module;
+ static
+ {
+ SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null));
+
+ final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>()
+ {
+ @Override
+ public void serialize(final ConfiguredObject value,
+ final JsonGenerator jgen,
+ final SerializerProvider provider)
+ throws IOException, JsonProcessingException
+ {
+ jgen.writeString(value.getId().toString());
+ }
+ };
+ module.addSerializer(ConfiguredObject.class, serializer);
+
+ _module = module;
+ }
+
protected final EventManager _eventManager = new EventManager();
protected final StateManager _stateManager;
@@ -1994,7 +2024,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
else
{
final Map<String, Object> attributes = configuredObject.getAttributes();
- byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
@@ -2129,12 +2162,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
- public void update(ConfiguredObjectRecord... records) throws StoreException
- {
- update(false, records);
- }
-
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
@@ -2176,6 +2203,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
ResultSet rs = stmt.executeQuery();
try
{
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
if (rs.next())
{
PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
@@ -2184,7 +2213,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
stmt2.setString(1, configuredObject.getType());
if (configuredObject.getAttributes() != null)
{
- byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
configuredObject.getAttributes());
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
@@ -2215,7 +2244,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
else
{
final Map<String, Object> attributes = configuredObject.getAttributes();
- byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index e552b3e073..589eca1600 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -84,7 +84,6 @@ public interface DurableConfigurationStore
void update(UUID id, String type, Map<String, Object> attributes) throws StoreException;
- public void update(ConfiguredObjectRecord... records) throws StoreException;
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index 77af1f92fd..8cecbc4d0b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store;
+import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -32,10 +33,14 @@ import java.util.Set;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.*;
+
+import javax.security.auth.Subject;
public class DurableConfigurationStoreHelper
{
@@ -47,41 +52,18 @@ public class DurableConfigurationStoreHelper
public static void updateQueue(DurableConfigurationStore store, AMQQueue queue)
{
- Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
-
- if (queue.getAlternateExchange() != null)
- {
- attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
- }
-
- Collection<String> availableAttrs = queue.getAvailableAttributes();
-
- for(String attrName : availableAttrs)
- {
- if(!QUEUE_ARGUMENTS_EXCLUDES.contains(attrName))
- {
- attributesMap.put(attrName, queue.getAttribute(attrName));
- }
- }
+ Map<String, Object> attributesMap = queue.getActualAttributes();
+ attributesMap.remove(ConfiguredObject.ID);
store.update(queue.getId(), QUEUE, attributesMap);
}
public static void createQueue(DurableConfigurationStore store, AMQQueue<?> queue)
{
- Map<String, Object> attributesMap = new HashMap<String, Object>();
- if (queue.getAlternateExchange() != null)
- {
- attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
- }
- for(String attrName : queue.getAvailableAttributes())
- {
- if(!QUEUE_ARGUMENTS_EXCLUDES.contains(attrName))
- {
- attributesMap.put(attrName, queue.getAttribute(attrName));
- }
- }
+ Map<String, Object> attributesMap = queue.getActualAttributes();
+ attributesMap.remove(ConfiguredObject.ID);
+
store.create(queue.getId(), QUEUE, attributesMap);
}
@@ -92,11 +74,8 @@ public class DurableConfigurationStoreHelper
public static void createExchange(DurableConfigurationStore store, ExchangeImpl exchange)
{
- Map<String, Object> attributesMap = new HashMap<String, Object>();
- attributesMap.put(Exchange.NAME, exchange.getName());
- attributesMap.put(Exchange.TYPE, exchange.getTypeName());
- attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()
- : LifetimePolicy.PERMANENT.name());
+ Map<String, Object> attributesMap = exchange.getActualAttributes();
+ attributesMap.remove(ConfiguredObject.ID);
store.create(exchange.getId(), EXCHANGE, attributesMap);
@@ -108,16 +87,17 @@ public class DurableConfigurationStoreHelper
store.remove(exchange.getId(), EXCHANGE);
}
- public static void createBinding(DurableConfigurationStore store, BindingImpl binding)
+ public static void createBinding(DurableConfigurationStore store, final BindingImpl binding)
{
- Map<String, Object> attributesMap = new HashMap<String, Object>();
- attributesMap.put(Binding.NAME, binding.getBindingKey());
- attributesMap.put(Binding.EXCHANGE, binding.getExchange().getId());
- attributesMap.put(Binding.QUEUE, binding.getAMQQueue().getId());
- Map<String, Object> arguments = binding.getArguments();
- if (arguments != null)
+ Map<String, Object> attributesMap = binding.getActualAttributes();
+ attributesMap.remove(ConfiguredObject.ID);
+ if(!attributesMap.containsKey(Binding.EXCHANGE))
+ {
+ attributesMap.put(Binding.EXCHANGE, binding.getExchange());
+ }
+ if(!attributesMap.containsKey(Binding.QUEUE))
{
- attributesMap.put(Binding.ARGUMENTS, arguments);
+ attributesMap.put(Binding.QUEUE, binding.getQueue());
}
store.create(binding.getId(), BINDING, attributesMap);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index ac9bfdcaae..3de601d741 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -461,12 +461,6 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
@Override
- public void update(final ConfiguredObjectRecord... records) throws StoreException
- {
- update(false, records);
- }
-
- @Override
public void update(final boolean createIfNecessary, final ConfiguredObjectRecord... records)
throws StoreException
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 9100d623cd..c579a27731 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -37,11 +37,6 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void update(ConfiguredObjectRecord... records)
- {
- }
-
- @Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
{
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 11c2451118..650b22ff51 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -55,6 +56,7 @@ import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -201,7 +203,31 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,_bindingArgs);
verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(BINDING),
- eq(map));
+ argThat(new IgnoreCreatedByMatcher(map)));
+ }
+
+ private static class IgnoreCreatedByMatcher extends ArgumentMatcher<Map<String,Object>>
+ {
+ private final Map<String,Object> _matchingMap;
+
+ private IgnoreCreatedByMatcher(final Map<String, Object> matchingMap)
+ {
+ _matchingMap = matchingMap;
+ }
+
+ @Override
+ public boolean matches(final Object argument)
+ {
+ if(argument instanceof Map)
+ {
+ Map<String,Object> arg = new HashMap<String, Object>((Map<String,Object>) argument);
+ arg.remove("createdBy");
+ arg.remove("createdTime");
+ return arg.equals(_matchingMap);
+
+ }
+ return false;
+ }
}
public void testUnbindQueue() throws Exception
@@ -373,6 +399,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(queue.getVirtualHost()).thenReturn(vh);
final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
attributes.put(Queue.NAME, queueName);
+ if(alternateExchange != null)
+ {
+ attributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange);
+ }
if(exclusive)
{
when(queue.getOwner()).thenReturn(queueOwner);
@@ -394,16 +424,24 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
}
});
+ when(queue.getActualAttributes()).thenReturn(attributes);
return queue;
}
private ExchangeImpl createTestExchange()
{
ExchangeImpl exchange = mock(ExchangeImpl.class);
+ Map<String,Object> actualAttributes = new HashMap<String, Object>();
+ actualAttributes.put("id", _exchangeId);
+ actualAttributes.put("name", getName());
+ actualAttributes.put("type", getName() + "Type");
+ actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+ when(exchange.getActualAttributes()).thenReturn(actualAttributes);
when(exchange.getName()).thenReturn(getName());
when(exchange.getTypeName()).thenReturn(getName() + "Type");
when(exchange.isAutoDelete()).thenReturn(true);
when(exchange.getId()).thenReturn(_exchangeId);
+
return exchange;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 928ed6be74..9f1f6f48c0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -53,6 +53,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@@ -338,7 +339,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
"x-filter-jms-selector",
"wibble"));
- doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class));
+ doThrow(new RuntimeException("Update Should not be called"))
+ .when(_store).update(anyBoolean(), any(ConfiguredObjectRecord[].class));
_durableConfigurationRecoverer.completeConfigurationRecovery();
}
@@ -442,12 +444,13 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
public Object answer(InvocationOnMock invocation) throws Throwable
{
Object[] args = invocation.getArguments();
+ final HashSet actual = new HashSet(Arrays.asList(args[1]));
assertEquals("Updated records are not as expected", new HashSet(Arrays.asList(
- expected)), new HashSet(Arrays.asList(args)));
+ expected)), actual);
return null;
}
- }).when(_store).update(any(ConfiguredObjectRecord[].class));
+ }).when(_store).update(anyBoolean(), any(ConfiguredObjectRecord[].class));
}
private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index db1d5d9327..9407ef2d6c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -219,14 +219,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void update(ConfiguredObjectRecord... records) throws StoreException
- {
- doPreDelay("update");
- _durableConfigurationStore.update(records);
- doPostDelay("update");
- }
-
- @Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
doPreDelay("update");