summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-05-11 15:11:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-05-11 15:11:31 +0000
commit97577a5105594c5526b4bd9b24fa896fc815bc93 (patch)
treee52d15be5891152bf85c071217ce9943bf7024e1
parent90593b58d192ce16ab9945a279ddf12a09f8e0e2 (diff)
downloadqpid-python-97577a5105594c5526b4bd9b24fa896fc815bc93.tar.gz
QPID-4382 : [Java Broker] Implement the default exchange as a facade on top of the queue registry
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1481350 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java266
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java227
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java356
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java79
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java10
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java5
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java29
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java32
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java62
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java27
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java14
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java6
38 files changed, 741 insertions, 499 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index fc58c7fb4d..8d88ee902a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -79,6 +79,8 @@ import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
@@ -269,7 +271,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
{
String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
- if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName()))
+ SecurityManager securityManager = getVirtualHost().getSecurityManager();
+ if (!securityManager.authorisePublish(info.isImmediate(), routingKey, e.getName()))
{
throw new AMQSecurityException("Permission denied: " + e.getName());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
deleted file mode 100644
index 69ff081528..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.binding;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
-import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.BindingMessages;
-import org.apache.qpid.server.logging.subjects.BindingLogSubject;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class BindingFactory
-{
- private final VirtualHost _virtualHost;
-
- private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindings = new ConcurrentHashMap<BindingImpl, BindingImpl>();
-
- public BindingFactory(final VirtualHost vhost)
- {
- _virtualHost = vhost;
- }
-
- private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task
- {
- private final BindingLogSubject _logSubject;
- //TODO : persist creation time
- private long _createTime = System.currentTimeMillis();
-
- private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
- {
- super(id, bindingKey, queue, exchange, arguments);
- _logSubject = new BindingLogSubject(bindingKey,exchange,queue);
-
- }
-
-
- public void doTask(final AMQQueue queue) throws AMQException
- {
- removeBinding(this);
- }
-
- public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
- {
- removeBinding(this);
- }
-
- void logCreation()
- {
- CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), getArguments() != null && !getArguments().isEmpty()));
- }
-
- void logDestruction()
- {
- CurrentActor.get().message(_logSubject, BindingMessages.DELETED());
- }
-
- public String getOrigin()
- {
- return (String) getArguments().get("qpid.fed.origin");
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public boolean isDurable()
- {
- return getQueue().isDurable() && getExchange().isDurable();
- }
-
- }
-
-
-
- public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
- {
- return makeBinding(null, bindingKey, queue, exchange, arguments, false, false);
- }
-
-
- public boolean replaceBinding(final UUID id, final String bindingKey,
- final AMQQueue queue,
- final Exchange exchange,
- final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
- {
- return makeBinding(id, bindingKey, queue, exchange, arguments, false, true);
- }
-
- private boolean makeBinding(UUID id, String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException
- {
- assert queue != null;
- final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
-
- if (bindingKey == null)
- {
- bindingKey = "";
- }
- if (exchange == null)
- {
- exchange = defaultExchange;
- }
- if (arguments == null)
- {
- arguments = Collections.emptyMap();
- }
-
- if (exchange == null)
- {
- throw new IllegalArgumentException("exchange cannot be null");
- }
-
- // The default exchange bindings must reflect the existence of queues, allow
- // all operations on it to succeed. It is up to the broker to prevent illegal
- // attempts at binding to this exchange, not the ACLs.
- if(exchange != defaultExchange)
- {
- //Perform ACLs
- if (!_virtualHost.getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
- {
- throw new AMQSecurityException("Permission denied: binding " + bindingKey);
- }
- }
-
- if (id == null)
- {
- id = UUIDGenerator.generateBindingUUID(exchange.getName(), queue.getName(), bindingKey, _virtualHost.getName());
- }
- BindingImpl b = new BindingImpl(id, bindingKey, queue, exchange, arguments);
- BindingImpl existingMapping = _bindings.putIfAbsent(b, b);
- if (existingMapping == null || force)
- {
- if (existingMapping != null)
- {
- removeBinding(existingMapping);
- }
-
- if (b.isDurable() && !restore)
- {
- _virtualHost.getMessageStore().bindQueue(b);
- }
-
- queue.addQueueDeleteTask(b);
- exchange.addCloseTask(b);
- queue.addBinding(b);
- exchange.addBinding(b);
- b.logCreation();
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
- {
- makeBinding(id, bindingKey,queue,exchange,argumentMap,true, false);
- }
-
- public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException
- {
- removeBinding(b.getBindingKey(), b.getQueue(), b.getExchange(), b.getArguments());
- }
-
-
- public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
- {
- assert queue != null;
- final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
-
- if (bindingKey == null)
- {
- bindingKey = "";
- }
- if (exchange == null)
- {
- exchange = defaultExchange;
- }
- if (arguments == null)
- {
- arguments = Collections.emptyMap();
- }
-
- // The default exchange bindings must reflect the existence of queues, allow
- // all operations on it to succeed. It is up to the broker to prevent illegal
- // attempts at binding to this exchange, not the ACLs.
- if(exchange != defaultExchange)
- {
- // Check access
- if (!_virtualHost.getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
- {
- throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
- }
- }
-
- BindingImpl b = _bindings.remove(new BindingImpl(null, bindingKey,queue,exchange,arguments));
-
- if (b != null)
- {
- exchange.removeBinding(b);
- queue.removeBinding(b);
- exchange.removeCloseTask(b);
- queue.removeQueueDeleteTask(b);
-
- if (b.isDurable())
- {
- _virtualHost.getMessageStore().unbindQueue(b);
- }
- b.logDestruction();
- }
-
- return b;
- }
-
- public Binding getBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments)
- {
- assert queue != null;
- final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
-
- if(bindingKey == null)
- {
- bindingKey = "";
- }
- if(exchange == null)
- {
- exchange = defaultExchange;
- }
- if(arguments == null)
- {
- arguments = Collections.emptyMap();
- }
-
- BindingImpl b = new BindingImpl(null, bindingKey,queue,exchange,arguments);
- return _bindings.get(b);
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 7a3367d215..0d05307cb4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -23,13 +23,18 @@ package org.apache.qpid.server.exchange;
import java.util.ArrayList;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -60,7 +65,7 @@ public abstract class AbstractExchange implements Exchange
private VirtualHost _virtualHost;
- private final List<Exchange.Task> _closeTaskList = new CopyOnWriteArrayList<Exchange.Task>();
+ private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>();
/**
* Whether the exchange is automatically deleted once all queues have detached from it
@@ -228,7 +233,7 @@ public abstract class AbstractExchange implements Exchange
_closeTaskList.remove(task);
}
- public final void addBinding(final Binding binding)
+ public final void doAddBinding(final Binding binding)
{
_bindings.add(binding);
int bindingCountSize = _bindings.size();
@@ -249,7 +254,7 @@ public abstract class AbstractExchange implements Exchange
return _bindingCountHigh.get();
}
- public final void removeBinding(final Binding binding)
+ public final void doRemoveBinding(final Binding binding)
{
onUnbind(binding);
for(BindingListener listener : _listeners)
@@ -380,4 +385,220 @@ public abstract class AbstractExchange implements Exchange
{
_listeners.remove(listener);
}
+
+ @Override
+ public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException
+ {
+ return makeBinding(null, bindingKey, queue, arguments, false, false);
+ }
+
+ @Override
+ public boolean replaceBinding(final UUID id, final String bindingKey,
+ final AMQQueue queue,
+ final Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException
+ {
+ return makeBinding(id, bindingKey, queue, arguments, false, true);
+ }
+
+ @Override
+ public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue,
+ final Map<String, Object> argumentMap)
+ throws AMQSecurityException, AMQInternalException
+ {
+ makeBinding(id, bindingKey,queue, argumentMap,true, false);
+ }
+
+ @Override
+ public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException
+ {
+ removeBinding(b.getBindingKey(), b.getQueue(), b.getArguments());
+ }
+
+ @Override
+ public Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException
+ {
+ assert queue != null;
+
+ if (bindingKey == null)
+ {
+ bindingKey = "";
+ }
+ if (arguments == null)
+ {
+ arguments = Collections.emptyMap();
+ }
+
+ // The default exchange bindings must reflect the existence of queues, allow
+ // all operations on it to succeed. It is up to the broker to prevent illegal
+ // attempts at binding to this exchange, not the ACLs.
+ // Check access
+ if (!_virtualHost.getSecurityManager().authoriseUnbind(this, new AMQShortString(bindingKey), queue))
+ {
+ throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
+ }
+
+ BindingImpl b = _bindingsMap.remove(new BindingImpl(null, bindingKey,queue,arguments));
+
+ if (b != null)
+ {
+ doRemoveBinding(b);
+ queue.removeBinding(b);
+ removeCloseTask(b);
+ queue.removeQueueDeleteTask(b);
+
+ if (b.isDurable())
+ {
+ _virtualHost.getMessageStore().unbindQueue(b);
+ }
+ b.logDestruction();
+ }
+
+ return b;
+ }
+
+
+ @Override
+ public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ {
+ assert queue != null;
+
+ if(bindingKey == null)
+ {
+ bindingKey = "";
+ }
+
+ if(arguments == null)
+ {
+ arguments = Collections.emptyMap();
+ }
+
+ BindingImpl b = new BindingImpl(null, bindingKey,queue,arguments);
+ return _bindingsMap.get(b);
+ }
+
+ private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingImpl, BindingImpl>();
+
+ private boolean makeBinding(UUID id,
+ String bindingKey,
+ AMQQueue queue,
+ Map<String, Object> arguments,
+ boolean restore,
+ boolean force) throws AMQSecurityException, AMQInternalException
+ {
+ assert queue != null;
+
+ if (bindingKey == null)
+ {
+ bindingKey = "";
+ }
+ if (arguments == null)
+ {
+ arguments = Collections.emptyMap();
+ }
+
+ //Perform ACLs
+ if (!_virtualHost.getSecurityManager().authoriseBind(AbstractExchange.this, queue, new AMQShortString(bindingKey)))
+ {
+ throw new AMQSecurityException("Permission denied: binding " + bindingKey);
+ }
+
+ if (id == null)
+ {
+ id = UUIDGenerator.generateBindingUUID(getName(),
+ queue.getName(),
+ bindingKey,
+ _virtualHost.getName());
+ }
+ BindingImpl b = new BindingImpl(id, bindingKey, queue, arguments);
+ BindingImpl existingMapping = _bindingsMap.putIfAbsent(b, b);
+ if (existingMapping == null || force)
+ {
+ if (existingMapping != null)
+ {
+ removeBinding(existingMapping);
+ }
+
+ if (b.isDurable() && !restore)
+ {
+ _virtualHost.getMessageStore().bindQueue(b);
+ }
+
+ queue.addQueueDeleteTask(b);
+ addCloseTask(b);
+ queue.addBinding(b);
+ doAddBinding(b);
+ b.logCreation();
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private final class BindingImpl extends Binding implements AMQQueue.Task, Task
+ {
+ private final BindingLogSubject _logSubject;
+ //TODO : persist creation time
+ private long _createTime = System.currentTimeMillis();
+
+ private BindingImpl(UUID id,
+ String bindingKey,
+ final AMQQueue queue,
+ final Map<String, Object> arguments)
+ {
+ super(id, bindingKey, queue, AbstractExchange.this, arguments);
+ _logSubject = new BindingLogSubject(bindingKey,AbstractExchange.this,queue);
+
+ }
+
+
+ public void doTask(final AMQQueue queue) throws AMQException
+ {
+ removeBinding(this);
+ }
+
+ public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
+ {
+ removeBinding(this);
+ }
+
+ void logCreation()
+ {
+ CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()),
+ getArguments() != null
+ && !getArguments().isEmpty()));
+ }
+
+ void logDestruction()
+ {
+ CurrentActor.get().message(_logSubject, BindingMessages.DELETED());
+ }
+
+ public String getOrigin()
+ {
+ return (String) getArguments().get("qpid.fed.origin");
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public boolean isDurable()
+ {
+ return getQueue().isDurable() && getExchange().isDurable();
+ }
+
+ }
+
+ public static interface Task
+ {
+ public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
new file mode 100644
index 0000000000..4e136965a1
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class DefaultExchange implements Exchange
+{
+
+ private UUID _id;
+ private VirtualHost _virtualHost;
+ private int _ticket;
+ private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
+ private final AtomicBoolean _closed = new AtomicBoolean();
+
+ private LogSubject _logSubject;
+ private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
+
+
+ @Override
+ public void initialise(UUID id,
+ VirtualHost host,
+ AMQShortString name,
+ boolean durable,
+ int ticket,
+ boolean autoDelete) throws AMQException
+ {
+ _id = id;
+ _virtualHost = host;
+ _ticket = ticket;
+ }
+
+ @Override
+ public String getName()
+ {
+ return ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString();
+ }
+
+ @Override
+ public ExchangeType getType()
+ {
+ return DirectExchange.TYPE;
+ }
+
+ @Override
+ public long getBindingCount()
+ {
+ return _virtualHost.getQueueRegistry().getQueues().size();
+ }
+
+ @Override
+ public long getByteDrops()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public long getByteReceives()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public long getMsgDrops()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public long getMsgReceives()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException
+ {
+ throw new AMQSecurityException("Cannot add bindings to the default exchange");
+ }
+
+ @Override
+ public boolean replaceBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException
+ {
+ throw new AMQSecurityException("Cannot replace bindings on the default exchange");
+ }
+
+ @Override
+ public void restoreBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> argumentMap)
+ throws AMQSecurityException, AMQInternalException
+ {
+ _logger.warn("Bindings to the default exchange should not be stored in the configuration store");
+ }
+
+ @Override
+ public void removeBinding(Binding b) throws AMQSecurityException, AMQInternalException
+ {
+ throw new AMQSecurityException("Cannot remove bindings to the default exchange");
+ }
+
+ @Override
+ public Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException
+ {
+ throw new AMQSecurityException("Cannot remove bindings to the default exchange");
+ }
+
+ @Override
+ public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ {
+ if(_virtualHost.getQueueRegistry().getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty()))
+ {
+ return convertToBinding(queue);
+ }
+ else
+ {
+ return null;
+ }
+
+ }
+
+ private Binding convertToBinding(AMQQueue queue)
+ {
+ String queueName = queue.getName();
+
+ UUID exchangeId = UUIDGenerator.generateBindingUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(),
+ queueName,
+ queueName,
+ _virtualHost.getName());
+
+ return new Binding(exchangeId, queueName, queue, this, Collections.EMPTY_MAP);
+ }
+
+ @Override
+ public AMQShortString getNameShortString()
+ {
+ return AMQShortString.EMPTY_STRING;
+ }
+
+ @Override
+ public AMQShortString getTypeShortString()
+ {
+ return getType().getName();
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isAutoDelete()
+ {
+ return false;
+ }
+
+ @Override
+ public int getTicket()
+ {
+ return _ticket;
+ }
+
+ @Override
+ public void close() throws AMQException
+ {
+ if(_closed.compareAndSet(false,true))
+ {
+
+ CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
+
+ }
+ }
+
+ @Override
+ public List<AMQQueue> route(InboundMessage message)
+ {
+ AMQQueue q = _virtualHost.getQueueRegistry().getQueue(message.getRoutingKey());
+ if(q == null)
+ {
+ List<AMQQueue> noQueues = Collections.emptyList();
+ return noQueues;
+ }
+ else
+ {
+ return Collections.singletonList(q);
+ }
+
+ }
+
+ @Override
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey, queue) && (arguments == null || arguments.isEmpty());
+ }
+
+ @Override
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
+ {
+ return isBound(routingKey) && isBound(queue) && queue.getNameShortString().equals(routingKey); //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public boolean isBound(AMQShortString routingKey)
+ {
+ return _virtualHost.getQueueRegistry().getQueue(routingKey) != null;
+ }
+
+ @Override
+ public boolean isBound(AMQQueue queue)
+ {
+ return _virtualHost.getQueueRegistry().getQueue(queue.getName()) == queue;
+ }
+
+ @Override
+ public boolean hasBindings()
+ {
+ return getBindingCount() != 0;
+ }
+
+ @Override
+ public boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ return isBound(queue) && queue.getName().equals(bindingKey);
+ }
+
+ @Override
+ public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
+ {
+ return isBound(bindingKey, queue) && (arguments == null || arguments.isEmpty());
+ }
+
+ @Override
+ public boolean isBound(String bindingKey)
+ {
+ return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null;
+ }
+
+ @Override
+ public Exchange getAlternateExchange()
+ {
+ return null;
+ }
+
+ @Override
+ public void setAlternateExchange(Exchange exchange)
+ {
+ _logger.warn("Cannot set the alternate exchange for the default exchange");
+ }
+
+ @Override
+ public void removeReference(ExchangeReferrer exchange)
+ {
+ _referrers.remove(exchange);
+ }
+
+ @Override
+ public void addReference(ExchangeReferrer exchange)
+ {
+ _referrers.put(exchange, Boolean.TRUE);
+ }
+
+ @Override
+ public boolean hasReferrers()
+ {
+ return !_referrers.isEmpty();
+ }
+
+ @Override
+ public Collection<Binding> getBindings()
+ {
+ List<Binding> bindings = new ArrayList<Binding>();
+ for(AMQQueue q : _virtualHost.getQueueRegistry().getQueues())
+ {
+ bindings.add(convertToBinding(q));
+ }
+ return bindings;
+ }
+
+ @Override
+ public void addBindingListener(BindingListener listener)
+ {
+ _virtualHost.getQueueRegistry().addRegistryChangeListener(convertListener(listener));//To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ private QueueRegistry.RegistryChangeListener convertListener(final BindingListener listener)
+ {
+ return new QueueRegistry.RegistryChangeListener()
+ {
+ @Override
+ public void queueRegistered(AMQQueue queue)
+ {
+ listener.bindingAdded(DefaultExchange.this, convertToBinding(queue));
+ }
+
+ @Override
+ public void queueUnregistered(AMQQueue queue)
+ {
+ listener.bindingRemoved(DefaultExchange.this, convertToBinding(queue));
+ }
+ };
+ }
+
+ @Override
+ public void removeBindingListener(BindingListener listener)
+ {
+ // TODO
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 9cce8d640b..450e74bfec 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -54,14 +55,21 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public DefaultExchangeRegistry(VirtualHost host)
{
- //create 'standard' exchanges:
_host = host;
-
}
public void initialise() throws AMQException
{
+ //create 'standard' exchanges:
new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore());
+
+ _defaultExchange = new DefaultExchange();
+
+ UUID defaultExchangeId =
+ UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName());
+
+ _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false,0,false);
+
}
public DurableConfigurationStore getDurableConfigurationStore()
@@ -106,11 +114,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
throw new AMQException(AMQConstant.NOT_FOUND, "Unknown exchange " + name, null);
}
- if (ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name))
- {
- throw new AMQException(AMQConstant.NOT_ALLOWED, "Cannot unregister the default exchange", null);
- }
-
if (!_host.getSecurityManager().authoriseDelete(exchange))
{
throw new AMQSecurityException();
@@ -228,7 +231,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public boolean isReservedExchangeName(String name)
{
- if (name == null || "".equals(name) || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(name)
+ if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(name)
|| name.startsWith("amq.") || name.startsWith("qpid."))
{
return true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 4bafb04c33..a5a1d7f912 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.JMException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -40,11 +39,33 @@ import java.util.UUID;
public interface Exchange extends ExchangeReferrer
{
+ void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ throws AMQException;
+
+
+ UUID getId();
String getName();
+ AMQShortString getNameShortString();
+
ExchangeType getType();
+ AMQShortString getTypeShortString();
+
+ boolean isDurable();
+
+ /**
+ * @return true if the exchange will be deleted after all queues have been detached
+ */
+ boolean isAutoDelete();
+
+ int getTicket();
+
+ Exchange getAlternateExchange();
+
+ void setAlternateExchange(Exchange exchange);
+
long getBindingCount();
long getByteDrops();
@@ -55,27 +76,25 @@ public interface Exchange extends ExchangeReferrer
long getMsgReceives();
- public interface BindingListener
- {
- void bindingAdded(Exchange exchange, Binding binding);
- void bindingRemoved(Exchange exchange, Binding binding);
- }
- AMQShortString getNameShortString();
+ boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException;
- AMQShortString getTypeShortString();
+ boolean replaceBinding(UUID id, String bindingKey,
+ AMQQueue queue,
+ Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException;
- void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
- throws AMQException, JMException;
+ void restoreBinding(UUID id, String bindingKey, AMQQueue queue,
+ Map<String, Object> argumentMap)
+ throws AMQSecurityException, AMQInternalException;
- boolean isDurable();
+ void removeBinding(Binding b) throws AMQSecurityException, AMQInternalException;
- /**
- * @return true if the exchange will be deleted after all queues have been detached
- */
- boolean isAutoDelete();
+ Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ throws AMQSecurityException, AMQInternalException;
- int getTicket();
+ Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments);
void close() throws AMQException;
@@ -124,6 +143,8 @@ public interface Exchange extends ExchangeReferrer
*/
boolean hasBindings();
+ Collection<Binding> getBindings();
+
boolean isBound(String bindingKey, AMQQueue queue);
@@ -131,36 +152,20 @@ public interface Exchange extends ExchangeReferrer
boolean isBound(String bindingKey);
- void addCloseTask(Task task);
-
- void removeCloseTask(Task task);
-
-
- Exchange getAlternateExchange();
-
- void setAlternateExchange(Exchange exchange);
-
void removeReference(ExchangeReferrer exchange);
void addReference(ExchangeReferrer exchange);
boolean hasReferrers();
- void addBinding(Binding binding);
-
- void removeBinding(Binding binding);
-
- Collection<Binding> getBindings();
+ public interface BindingListener
+ {
+ void bindingAdded(Exchange exchange, Binding binding);
+ void bindingRemoved(Exchange exchange, Binding binding);
+ }
public void addBindingListener(BindingListener listener);
public void removeBindingListener(BindingListener listener);
-
- public static interface Task
- {
- public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
- }
-
- UUID getId();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
index edb476f3aa..313b5eefff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
@@ -21,9 +21,11 @@
package org.apache.qpid.server.exchange;
+import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -36,8 +38,6 @@ public class ExchangeInitialiser
define (registry, factory, type.getDefaultExchangeName(), type.getName(), store);
}
- define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, store);
- registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME));
}
private void define(ExchangeRegistry r, ExchangeFactory f,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 49ca934966..0501443efa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -127,14 +127,14 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
String bindingKey = String.valueOf(routingKey);
Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
- if(!virtualHost.getBindingFactory().addBinding(bindingKey, queue, exch, arguments))
+ if(!exch.addBinding(bindingKey, queue, arguments))
{
- Binding oldBinding = virtualHost.getBindingFactory().getBinding(bindingKey, queue, exch, arguments);
+ Binding oldBinding = exch.getBinding(bindingKey, queue, arguments);
Map<String, Object> oldArgs = oldBinding.getArguments();
if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments)))
{
- virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, queue, exch, arguments);
+ exch.replaceBinding(oldBinding.getId(), bindingKey, queue, arguments);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 194c3d6351..c889f5660d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -145,11 +145,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
});
}
}
- Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
-
- virtualHost.getBindingFactory().addBinding(String.valueOf(queueName), queue, defaultExchange,
- Collections.<String, Object> emptyMap());
- _logger.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getNameShortString() + ")");
}
}
else if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
index 66a6ff6527..523c7acd88 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
@@ -103,13 +103,13 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
}
- if(virtualHost.getBindingFactory().getBinding(String.valueOf(routingKey), queue, exch, FieldTable.convertToMap(body.getArguments())) == null)
+ if(exch.getBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments())) == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding");
}
else
{
- virtualHost.getBindingFactory().removeBinding(String.valueOf(routingKey), queue, exch, FieldTable.convertToMap(body.getArguments()));
+ exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments()));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
index 39e979174a..92b8f55f23 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
@@ -142,7 +142,7 @@ final class BindingAdapter extends AbstractAdapter implements Binding
{
try
{
- _queue.getAMQQueue().getVirtualHost().getBindingFactory().removeBinding(_binding);
+ _exchange.getExchange().removeBinding(_binding);
}
catch(AMQSecurityException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
index a081f03f09..5d3d507fff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
@@ -134,18 +134,17 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa
try
{
- if(!virtualHost.getBindingFactory().addBinding(bindingKey, amqQueue, _exchange, bindingArguments))
+ if(!_exchange.addBinding(bindingKey, amqQueue, bindingArguments))
{
- Binding oldBinding = virtualHost.getBindingFactory().getBinding(bindingKey, amqQueue, _exchange,
- bindingArguments);
+ Binding oldBinding = _exchange.getBinding(bindingKey, amqQueue, bindingArguments);
Map<String, Object> oldArgs = oldBinding.getArguments();
if((oldArgs == null && !bindingArguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(bindingArguments)))
{
- virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, amqQueue, _exchange, bindingArguments);
+ _exchange.replaceBinding(oldBinding.getId(), bindingKey, amqQueue, bindingArguments);
}
}
- Binding binding = virtualHost.getBindingFactory().getBinding(bindingKey, amqQueue, _exchange, bindingArguments);
+ Binding binding = _exchange.getBinding(bindingKey, amqQueue, bindingArguments);
synchronized (_bindingAdapters)
{
@@ -311,8 +310,12 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa
}
if(adapter != null)
{
- _vhost.getQueueAdapter(binding.getQueue()).bindingUnregistered(binding);
- childRemoved(adapter);
+ QueueAdapter queueAdapter = _vhost.getQueueAdapter(binding.getQueue());
+ if(queueAdapter != null)
+ {
+ queueAdapter.bindingUnregistered(binding);
+ childRemoved(adapter);
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index df171d61f6..b5c34a9f8b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -396,7 +396,6 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name,
durable, owner, lifetime == LifetimePolicy.AUTO_DELETE,
exclusive, _virtualHost, attributes);
- _virtualHost.getBindingFactory().addBinding(name, queue, _virtualHost.getExchangeRegistry().getDefaultExchange(), null);
if(durable)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 589c3a0892..a72671762c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -296,7 +296,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- vhost.getBindingFactory().addBinding(binding,queue,exchange,null);
+ exchange.addBinding(binding,queue,null);
source.setDistributionMode(StdDistMode.COPY);
if(!isDurable)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index a65a6a8eb2..872a936462 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -329,7 +329,7 @@ public class AMQQueueFactory
{
//actual routing key used does not matter due to use of fanout exchange,
//but we will make the key 'dlq' as it can be logged at creation.
- virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null);
+ dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null);
}
q.setAlternateExchange(dlExchange);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b16af05883..980145a83e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1321,9 +1321,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (!_deleted.getAndSet(true))
{
- for (Binding b : getBindings())
+ for (Binding b : _bindings)
{
- _virtualHost.getBindingFactory().removeBinding(b);
+ b.getExchange().removeBinding(b);
}
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 7a6c0c42be..d8d245e255 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -1020,7 +1020,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
try
{
- virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments());
+ exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
}
catch (AMQException e)
{
@@ -1075,7 +1075,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
try
{
- virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null);
+ exchange.removeBinding(method.getBindingKey(), queue, null);
}
catch (AMQException e)
{
@@ -1266,12 +1266,6 @@ public class ServerSessionDelegate extends SessionDelegate
}
queueRegistry.registerQueue(queue);
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
-
- Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
-
- virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null);
-
if (method.hasAutoDelete()
&& method.getAutoDelete()
&& method.hasExclusive()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index d24f79c56c..eb1481b719 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,18 +20,15 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -74,8 +71,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
VirtualHostRegistry getVirtualHostRegistry();
- BindingFactory getBindingFactory();
-
DtxRegistry getDtxRegistry();
LinkRegistry getLinkRegistry(String remoteContainerId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index ae88e3e9f7..640e6ff459 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -33,13 +33,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageTransferMessage;
@@ -382,17 +380,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
}
- BindingFactory bf = _virtualHost.getBindingFactory();
-
Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT);
- if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null)
+ if(exchange.getBinding(bindingKey, queue, argumentMap) == null)
{
_logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName()
+ ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
- bf.restoreBinding(bindingId, bindingKey, queue, exchange, argumentMap);
+ exchange.restoreBinding(bindingId, bindingKey, queue, argumentMap);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index b39f975c28..8559a0263a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -33,7 +33,6 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -96,8 +95,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
private final ConnectionRegistry _connectionRegistry;
- private final BindingFactory _bindingFactory;
-
private final DtxRegistry _dtxRegistry;
private final MessageStore _messageStore;
@@ -144,8 +141,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
_exchangeRegistry = new DefaultExchangeRegistry(this);
- _bindingFactory = new BindingFactory(this);
-
_messageStore = initialiseMessageStore(hostConfig);
configureMessageStore(hostConfig);
@@ -385,13 +380,18 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
{
String routingKey = String.valueOf(routingKeyNameObj);
- if (exchange.equals(defaultExchange) && !queueName.equals(routingKey))
+ if (exchange.equals(defaultExchange))
{
- throw new ConfigurationException("Illegal attempt to bind queue '" + queueName +
- "' to the default exchange with a key other than the queue name: " + routingKey);
+ if(!queueName.equals(routingKey))
+ {
+ throw new ConfigurationException("Illegal attempt to bind queue '" + queueName +
+ "' to the default exchange with a key other than the queue name: " + routingKey);
+ }
+ }
+ else
+ {
+ configureBinding(queue, exchange, routingKey);
}
-
- configureBinding(queue, exchange, routingKey);
}
if (!exchange.equals(defaultExchange))
@@ -400,8 +400,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
configureBinding(queue, exchange, queueName);
}
- //ensure the queue is bound to the default exchange using its name
- configureBinding(queue, defaultExchange, queueName);
}
private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws AMQException
@@ -410,7 +408,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
{
_logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName());
}
- _bindingFactory.addBinding(routingKey, queue, exchange, null);
+ exchange.addBinding(routingKey, queue, null);
}
public String getName()
@@ -479,11 +477,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
return _virtualHostRegistry;
}
- public BindingFactory getBindingFactory()
- {
- return _bindingFactory;
- }
-
public void registerMessageDelivered(long messageSize)
{
_messagesDelivered.registerEvent(1L);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
index 4d6d60906d..26fc51c400 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
@@ -53,8 +52,6 @@ public class AcknowledgeTest extends QpidTestCase
_queueName = getTestName();
_queue = BrokerTestHelper.createQueue(_queueName, virtualHost);
_messageStore = virtualHost.getMessageStore();
- Exchange defaultExchange = virtualHost.getExchangeRegistry().getDefaultExchange();
- virtualHost.getBindingFactory().addBinding(_queueName, _queue, defaultExchange, null);
}
@Override
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 121a8764ec..2ddb417d5d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -20,24 +20,39 @@
*/
package org.apache.qpid.server.exchange;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.UUID;
-
import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class FanoutExchangeTest extends TestCase
{
private FanoutExchange _exchange;
+ private VirtualHost _virtualHost;
- public void setUp()
+ public void setUp() throws AMQException
{
+ CurrentActor.setDefault(mock(LogActor.class));
+
_exchange = new FanoutExchange();
+ _virtualHost = mock(VirtualHost.class);
+ SecurityManager securityManager = mock(SecurityManager.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
+ when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true);
+ _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false);
}
public void testIsBoundAMQShortStringFieldTableAMQQueueWhenQueueIsNull()
@@ -57,31 +72,32 @@ public class FanoutExchangeTest extends TestCase
assertFalse("calling isBound(AMQQueue) with null queue should return false", _exchange.isBound((AMQQueue) null));
}
- public void testIsBoundAMQShortStringFieldTableAMQQueue()
+ public void testIsBoundAMQShortStringFieldTableAMQQueue() throws AMQSecurityException, AMQInternalException
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
_exchange.isBound((AMQShortString) null, (FieldTable) null, queue));
}
- public void testIsBoundAMQShortStringAMQQueue()
+ public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
_exchange.isBound((AMQShortString) null, queue));
}
- public void testIsBoundAMQQueue()
+ public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException
{
AMQQueue queue = bindQueue();
assertTrue("Should return true for a bound queue",
_exchange.isBound(queue));
}
- private AMQQueue bindQueue()
+ private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException
{
AMQQueue queue = mock(AMQQueue.class);
- _exchange.addBinding(new Binding(UUID.randomUUID(), "does not matter", queue, _exchange, null));
+ when(queue.getVirtualHost()).thenReturn(_virtualHost);
+ _exchange.addBinding("does not matter", queue, null);
return queue;
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
index dd8d28e836..a33c85dfd1 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
@@ -45,7 +45,7 @@ public class BindingLogSubjectTest extends AbstractTestLogSubject
_testVhost = BrokerTestHelper.createVirtualHost("test");
_routingKey = new AMQShortString("RoutingKey");
- _exchange = _testVhost.getExchangeRegistry().getDefaultExchange();
+ _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct");
_queue = new MockAMQQueue("BindingLogSubjectTest");
((MockAMQQueue) _queue).setVirtualHost(_testVhost);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
index 8d1b89bf3c..775a306bd3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
@@ -40,7 +40,7 @@ public class ExchangeLogSubjectTest extends AbstractTestLogSubject
_testVhost = BrokerTestHelper.createVirtualHost("test");
- _exchange = _testVhost.getExchangeRegistry().getDefaultExchange();
+ _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct");
_subject = new ExchangeLogSubject(_exchange,_testVhost);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index ece42f7de3..505c47a69b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
@@ -164,7 +165,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
public void testBinding() throws AMQSecurityException, AMQInternalException
{
- _virtualHost.getBindingFactory().addBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP);
+ _exchange.addBinding(String.valueOf(_routingKey), _queue, Collections.EMPTY_MAP);
assertTrue("Routing key was not bound",
_exchange.isBound(_routingKey));
@@ -177,7 +178,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertEquals("Wrong exchange bound", _exchange,
_queue.getBindings().get(0).getExchange());
- _virtualHost.getBindingFactory().removeBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP);
+ _exchange.removeBinding(String.valueOf(_routingKey), _queue, Collections.EMPTY_MAP);
assertFalse("Routing key was still bound",
_exchange.isBound(_routingKey));
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index ffd777243b..809ae72b89 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -829,7 +829,7 @@ public class MessageStoreTest extends QpidTestCase
try
{
- getVirtualHost().getBindingFactory().addBinding(String.valueOf(routingKey), queue, exchange, FieldTable.convertToMap(bindArguments));
+ exchange.addBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments));
}
catch (Exception e)
{
@@ -849,7 +849,7 @@ public class MessageStoreTest extends QpidTestCase
try
{
- getVirtualHost().getBindingFactory().removeBinding(String.valueOf(routingKey), queue, exchange, FieldTable.convertToMap(bindArguments));
+ exchange.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments));
}
catch (Exception e)
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
index d35a90e3c8..25b86eb73f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
@@ -55,8 +55,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
_queueName = getTestName();
_queue = BrokerTestHelper.createQueue(_queueName, virtualHost);
_messageStore = virtualHost.getMessageStore();
- Exchange defaultExchange = virtualHost.getExchangeRegistry().getDefaultExchange();
- virtualHost.getBindingFactory().addBinding(_queueName, _queue, defaultExchange, null);
}
@Override
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 1d99d99820..324e36e132 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.virtualhost;
import java.util.concurrent.ScheduledFuture;
-import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
@@ -61,11 +61,6 @@ public class MockVirtualHost implements VirtualHost
return null;
}
- public BindingFactory getBindingFactory()
- {
- return null;
- }
-
public DtxRegistry getDtxRegistry()
{
return null;
diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
index 975ec4daca..c9bf67c11d 100644
--- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
+++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
@@ -41,7 +41,7 @@ public class ExchangeDefaults
}
/** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */
- public static final AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>");
+ public static final AMQShortString DEFAULT_EXCHANGE_NAME = AMQShortString.EMPTY_STRING;
/** The pre-defined topic exchange, the broker SHOULD provide this. */
public static final AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic");
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
index 2c7288de14..dfd26b474a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
@@ -106,19 +106,13 @@ public class BindingLoggingTest extends AbstractTestLogging
List<String> results = waitAndFindMatches(BND_PREFIX);
- // We will have two binds as we bind all queues to the default exchange
- assertEquals("Result set larger than expected.", 2, results.size());
+ assertEquals("Result set larger than expected.", 1, results.size());
- String exchange = "direct/<<default>>";
String messageID = "BND-1001";
- String message = "Create";
String queueName = _queue.getQueueName();
-
+ String exchange = "direct/amq.direct";
+ String message = "Create : Arguments : {x-filter-jms-selector=}";
validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName);
-
- exchange = "direct/amq.direct";
- message = "Create : Arguments : {x-filter-jms-selector=}";
- validateLogMessage(getLogMessage(results, 1), messageID, message, exchange, queueName, queueName);
}
/**
@@ -145,23 +139,14 @@ public class BindingLoggingTest extends AbstractTestLogging
List<String> results = waitAndFindMatches(BND_PREFIX);
- // We will have two binds as we bind all queues to the default exchange
- assertEquals("Result set larger than expected.", 2, results.size());
+ assertEquals("Result set larger than expected.", 1, results.size());
- //Verify the first entry is the default binding
String messageID = "BND-1001";
- String message = "Create";
-
- validateLogMessage(getLogMessage(results, 0), messageID, message,
- "direct/<<default>>", "clientid:" + getName(), "clientid:" + getName());
- //Default binding will be without the selector
- assertTrue("JMSSelector identified in binding:"+message, !message.contains("jms-selector"));
-
- // Perform full testing on the second non default binding
- message = getMessageString(fromMessage(getLogMessage(results, 1)));
+ // Perform full testing on the binding
+ String message = getMessageString(fromMessage(getLogMessage(results, 0)));
- validateLogMessage(getLogMessage(results, 1), messageID, message,
+ validateLogMessage(getLogMessage(results, 0), messageID, message,
"topic/amq.topic", "topic", "clientid:" + getName());
assertTrue("JMSSelector not identified in binding:"+message, message.contains("jms-selector"));
@@ -204,7 +189,7 @@ public class BindingLoggingTest extends AbstractTestLogging
List<String> results = waitAndFindMatches(BND_PREFIX);
// We will have two binds as we bind all queues to the default exchange
- assertEquals("Result not as expected." + results, 4, results.size());
+ assertEquals("Result not as expected." + results, 2, results.size());
String messageID = "BND-1001";
@@ -214,49 +199,20 @@ public class BindingLoggingTest extends AbstractTestLogging
validateMessageID(messageID, log);
assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log)));
- log = getLogMessage(results, 1);
- validateMessageID(messageID, log);
- assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log)));
-
-
- String DEFAULT = "direct/<<default>>";
String DIRECT = "direct/amq.direct";
messageID = "BND-1002";
message = "Deleted";
- log = getLogMessage(results, 2);
+ log = getLogMessage(results, 1);
validateMessageID(messageID, log);
String subject = fromSubject(log);
validateBindingDeleteArguments(subject, "/test");
- boolean defaultFirst = DEFAULT.equals(AbstractTestLogSubject.getSlice("ex", subject));
- boolean directFirst = DIRECT.equals(AbstractTestLogSubject.getSlice("ex", subject));
-
assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log)));
- log = getLogMessage(results, 3);
-
- validateMessageID(messageID, log);
-
- subject = fromSubject(log);
-
- validateBindingDeleteArguments(subject, "/test");
-
- if (!defaultFirst)
- {
- assertEquals(DEFAULT, AbstractTestLogSubject.getSlice("ex", subject));
- assertTrue("First Exchange Log was not a direct exchange delete",directFirst);
- }
- else
- {
- assertEquals(DIRECT, AbstractTestLogSubject.getSlice("ex", subject));
- assertTrue("First Exchange Log was not a default exchange delete",defaultFirst);
- }
-
- assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log)));
}
private void validateBindingDeleteArguments(String subject, String vhostName)
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java
index 3f979bea27..f6b56f64ce 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java
@@ -64,11 +64,7 @@ public class BrokerManagementTest extends QpidBrokerTestCase
public void testCreateQueueAndDeletion() throws Exception
{
final String queueName = getTestQueueName();
- final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString());
- // Check that bind does not exist before queue creation
- assertFalse("Binding to " + queueName + " should not exist in default exchange before queue creation",
- defaultExchange.bindings().containsKey(new String[] {queueName}));
_managedBroker.createNewQueue(queueName, "testowner", true);
@@ -76,16 +72,11 @@ public class BrokerManagementTest extends QpidBrokerTestCase
assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName));
assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName));
- // Now verify that the default exchange has been bound.
- assertTrue("Binding to " + queueName + " should exist in default exchange after queue creation",
- defaultExchange.bindings().containsKey(new String[] {queueName}));
// Now delete the queue
_managedBroker.deleteQueue(queueName);
- // Finally ensure that the binding has been removed.
- assertFalse("Binding to " + queueName + " should not exist in default exchange after queue deletion",
- defaultExchange.bindings().containsKey(new String[] {queueName}));
+
}
/**
@@ -105,24 +96,24 @@ public class BrokerManagementTest extends QpidBrokerTestCase
/**
* Tests that it is disallowed to unregister the default exchange.
*/
- public void testUnregisterOfDefaultExchangeDisallowed() throws Exception
+ public void testUnregisterOfAmqDirectExchangeDisallowed() throws Exception
{
- String defaultExchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString();
+ String amqDirectExchangeName = "amq.direct";
- ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName);
- assertNotNull("Exchange should exist", defaultExchange);
+ ManagedExchange amqDirectExchange = _jmxUtils.getManagedExchange(amqDirectExchangeName);
+ assertNotNull("Exchange should exist", amqDirectExchange);
try
{
- _managedBroker.unregisterExchange(defaultExchangeName);
+ _managedBroker.unregisterExchange(amqDirectExchangeName);
fail("Exception not thrown");
}
catch (UnsupportedOperationException e)
{
// PASS
- assertEquals("'<<default>>' is a reserved exchange and can't be deleted", e.getMessage());
+ assertEquals("'"+amqDirectExchangeName+"' is a reserved exchange and can't be deleted", e.getMessage());
}
- defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName);
- assertNotNull("Exchange should exist", defaultExchange);
+ amqDirectExchange = _jmxUtils.getManagedExchange(amqDirectExchangeName);
+ assertNotNull("Exchange should exist", amqDirectExchange);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java
index 69e81cf85d..4358b4b450 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java
@@ -365,7 +365,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging
List<String> results = waitAndFindMatches("BND-1001");
- assertEquals("Unexpected number of bindings logged", 2, results.size());
+ assertEquals("Unexpected number of bindings logged", 1, results.size());
String log = getLogMessage(results, 0);
@@ -392,7 +392,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging
List<String> results = waitAndFindMatches("BND-1001");
- assertEquals("Unexpected number of bindings logged", 2, results.size());
+ assertEquals("Unexpected number of bindings logged", 1, results.size());
String log = getLogMessage(results, 0);
@@ -419,7 +419,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging
List<String> results = waitAndFindMatches("BND-1001");
- assertEquals("Unexpected number of bindings logged", 2, results.size());
+ assertEquals("Unexpected number of bindings logged", 1, results.size());
String log = getLogMessage(results, 0);
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java
index a113cd734e..666c78f070 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java
@@ -46,7 +46,7 @@ public class BindingRestTest extends QpidRestTestCase
{
List<Map<String, Object>> bindings = getRestTestHelper().getJsonAsList("/rest/binding/test");
assertNotNull("Bindings cannot be null", bindings);
- assertEquals("Unexpected number of bindings", EXPECTED_QUEUES.length * 2, bindings.size());
+ assertEquals("Unexpected number of bindings", EXPECTED_QUEUES.length, bindings.size());
for (String queueName : EXPECTED_QUEUES)
{
Map<String, Object> searchAttributes = new HashMap<String, Object>();
@@ -55,11 +55,6 @@ public class BindingRestTest extends QpidRestTestCase
Map<String, Object> binding = getRestTestHelper().find(searchAttributes, bindings);
Asserts.assertBinding(queueName, "amq.direct", binding);
-
- searchAttributes.put(Binding.EXCHANGE, "<<default>>");
-
- binding = getRestTestHelper().find(searchAttributes, bindings);
- Asserts.assertBinding(queueName, "<<default>>", binding);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
index 743ba00cdd..57398ea929 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
@@ -41,7 +41,7 @@ public class QpidRestTestCase extends QpidBrokerTestCase
public static final String[] EXPECTED_VIRTUALHOSTS = { TEST1_VIRTUALHOST, TEST2_VIRTUALHOST, TEST3_VIRTUALHOST};
public static final String[] EXPECTED_QUEUES = { "queue", "ping" };
- public static final String[] EXPECTED_EXCHANGES = { "amq.fanout", "amq.match", "amq.direct","amq.topic","<<default>>" };
+ public static final String[] EXPECTED_EXCHANGES = { "amq.fanout", "amq.match", "amq.direct","amq.topic" };
private RestTestHelper _restTestHelper = new RestTestHelper(findFreePort());
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java
index d6eae154cf..fec516bc2b 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java
@@ -94,11 +94,9 @@ public class QueueRestTest extends QpidRestTestCase
@SuppressWarnings("unchecked")
List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS);
assertNotNull("Queue bindings are not found", bindings);
- assertEquals("Unexpected number of bindings", 2, bindings.size());
+ assertEquals("Unexpected number of bindings", 1, bindings.size());
- Map<String, Object> defaultExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "<<default>>", bindings);
Map<String, Object> directExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "amq.direct", bindings);
- Asserts.assertBinding(name, "<<default>>", defaultExchangeBinding);
Asserts.assertBinding(name, "amq.direct", directExchangeBinding);
}
}
@@ -113,11 +111,9 @@ public class QueueRestTest extends QpidRestTestCase
@SuppressWarnings("unchecked")
List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS);
assertNotNull("Queue bindings are not found", bindings);
- assertEquals("Unexpected number of bindings", 2, bindings.size());
+ assertEquals("Unexpected number of bindings", 1, bindings.size());
- Map<String, Object> defaultExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "<<default>>", bindings);
Map<String, Object> directExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "amq.direct", bindings);
- Asserts.assertBinding(queueName, "<<default>>", defaultExchangeBinding);
Asserts.assertBinding(queueName, "amq.direct", directExchangeBinding);
@SuppressWarnings("unchecked")
@@ -169,7 +165,7 @@ public class QueueRestTest extends QpidRestTestCase
{
String queueName = getTestQueueName();
String bindingName = queueName + 2;
- String[] exchanges = { "amq.direct", "amq.fanout", "amq.topic", "amq.match", "<<default>>" };
+ String[] exchanges = { "amq.direct", "amq.fanout", "amq.topic", "amq.match" };
for (int i = 0; i < exchanges.length; i++)
{
@@ -182,7 +178,7 @@ public class QueueRestTest extends QpidRestTestCase
@SuppressWarnings("unchecked")
List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS);
assertNotNull("Queue bindings are not found", bindings);
- assertEquals("Unexpected number of bindings", exchanges.length + 2, bindings.size());
+ assertEquals("Unexpected number of bindings", exchanges.length + 1, bindings.size());
Map<String, Object> searchAttributes = new HashMap<String, Object>();
searchAttributes.put(Binding.NAME, bindingName);
@@ -244,7 +240,7 @@ public class QueueRestTest extends QpidRestTestCase
statistics.get(Queue.CONSUMER_COUNT));
assertEquals("Unexpected queue statistics attribute " + Queue.CONSUMER_COUNT_WITH_CREDIT, 1,
statistics.get(Queue.CONSUMER_COUNT_WITH_CREDIT));
- assertEquals("Unexpected queue statistics attribute " + Queue.BINDING_COUNT, 2, statistics.get(Queue.BINDING_COUNT));
+ assertEquals("Unexpected queue statistics attribute " + Queue.BINDING_COUNT, 1, statistics.get(Queue.BINDING_COUNT));
assertEquals("Unexpected queue statistics attribute " + Queue.PERSISTENT_DEQUEUED_MESSAGES, DEQUEUED_MESSAGES,
statistics.get(Queue.PERSISTENT_DEQUEUED_MESSAGES));
assertEquals("Unexpected queue statistics attribute " + Queue.TOTAL_DEQUEUED_MESSAGES, DEQUEUED_MESSAGES,
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java
index cb4fd1ad5b..664b8fffa4 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.systest.rest;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -79,7 +80,8 @@ public class StructureRestTest extends QpidRestTestCase
Map<String, Object> exchange = getRestTestHelper().find("name", exchangeName, exchanges);
assertNotNull("Exchange " + exchangeName + " is not found ", exchange);
assertNode(exchange, exchangeName);
- if ("amq.direct".equalsIgnoreCase(exchangeName) || "<<default>>".equalsIgnoreCase(exchangeName))
+ if (ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString().equalsIgnoreCase(exchangeName) ||
+ ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equalsIgnoreCase(exchangeName))
{
@SuppressWarnings("unchecked")
List<Map<String, Object>> bindings = (List<Map<String, Object>>) exchange.get("bindings");
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index c65f8bbd08..1e49351323 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -84,7 +84,6 @@ public class VirtualHostRestTest extends QpidRestTestCase
Asserts.assertDurableExchange("amq.topic", "topic", getRestTestHelper().find(Exchange.NAME, "amq.topic", exchanges));
Asserts.assertDurableExchange("amq.direct", "direct", getRestTestHelper().find(Exchange.NAME, "amq.direct", exchanges));
Asserts.assertDurableExchange("amq.match", "headers", getRestTestHelper().find(Exchange.NAME, "amq.match", exchanges));
- Asserts.assertDurableExchange("<<default>>", "direct", getRestTestHelper().find(Exchange.NAME, "<<default>>", exchanges));
@SuppressWarnings("unchecked")
List<Map<String, Object>> queues = (List<Map<String, Object>>) hostDetails.get(VIRTUALHOST_QUEUES_ATTRIBUTE);
@@ -600,7 +599,6 @@ public class VirtualHostRestTest extends QpidRestTestCase
Asserts.assertDurableExchange("amq.topic", "topic", restTestHelper.find(Exchange.NAME, "amq.topic", exchanges));
Asserts.assertDurableExchange("amq.direct", "direct", restTestHelper.find(Exchange.NAME, "amq.direct", exchanges));
Asserts.assertDurableExchange("amq.match", "headers", restTestHelper.find(Exchange.NAME, "amq.match", exchanges));
- Asserts.assertDurableExchange("<<default>>", "direct", restTestHelper.find(Exchange.NAME, "<<default>>", exchanges));
assertNull("Unexpected queues", hostDetails.get(VIRTUALHOST_QUEUES_ATTRIBUTE));
assertNull("Unexpected connections", hostDetails.get(VIRTUALHOST_CONNECTIONS_ATTRIBUTE));
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 22a98b6f42..3783b0bd02 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -108,7 +108,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
(AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), null));
// create receiver -----------------------------------------
addr1 = "ADDR:testQueue2; { create: receiver }";
@@ -133,7 +133,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
(AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), null));
// create never --------------------------------------------
addr1 = "ADDR:testQueue3; { create: never }";
@@ -182,7 +182,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
(AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), null));
}