diff options
38 files changed, 741 insertions, 499 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index fc58c7fb4d..8d88ee902a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -79,6 +79,8 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.security.*; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; @@ -269,7 +271,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException { String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); - if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName())) + SecurityManager securityManager = getVirtualHost().getSecurityManager(); + if (!securityManager.authorisePublish(info.isImmediate(), routingKey, e.getName())) { throw new AMQSecurityException("Permission denied: " + e.getName()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java deleted file mode 100644 index 69ff081528..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.binding; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; -import org.apache.qpid.AMQSecurityException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.BindingMessages; -import org.apache.qpid.server.logging.subjects.BindingLogSubject; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.Collections; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -public class BindingFactory -{ - private final VirtualHost _virtualHost; - - private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindings = new ConcurrentHashMap<BindingImpl, BindingImpl>(); - - public BindingFactory(final VirtualHost vhost) - { - _virtualHost = vhost; - } - - private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task - { - private final BindingLogSubject _logSubject; - //TODO : persist creation time - private long _createTime = System.currentTimeMillis(); - - private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) - { - super(id, bindingKey, queue, exchange, arguments); - _logSubject = new BindingLogSubject(bindingKey,exchange,queue); - - } - - - public void doTask(final AMQQueue queue) throws AMQException - { - removeBinding(this); - } - - public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException - { - removeBinding(this); - } - - void logCreation() - { - CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), getArguments() != null && !getArguments().isEmpty())); - } - - void logDestruction() - { - CurrentActor.get().message(_logSubject, BindingMessages.DELETED()); - } - - public String getOrigin() - { - return (String) getArguments().get("qpid.fed.origin"); - } - - public long getCreateTime() - { - return _createTime; - } - - public boolean isDurable() - { - return getQueue().isDurable() && getExchange().isDurable(); - } - - } - - - - public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException - { - return makeBinding(null, bindingKey, queue, exchange, arguments, false, false); - } - - - public boolean replaceBinding(final UUID id, final String bindingKey, - final AMQQueue queue, - final Exchange exchange, - final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException - { - return makeBinding(id, bindingKey, queue, exchange, arguments, false, true); - } - - private boolean makeBinding(UUID id, String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException - { - assert queue != null; - final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); - - if (bindingKey == null) - { - bindingKey = ""; - } - if (exchange == null) - { - exchange = defaultExchange; - } - if (arguments == null) - { - arguments = Collections.emptyMap(); - } - - if (exchange == null) - { - throw new IllegalArgumentException("exchange cannot be null"); - } - - // The default exchange bindings must reflect the existence of queues, allow - // all operations on it to succeed. It is up to the broker to prevent illegal - // attempts at binding to this exchange, not the ACLs. - if(exchange != defaultExchange) - { - //Perform ACLs - if (!_virtualHost.getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey))) - { - throw new AMQSecurityException("Permission denied: binding " + bindingKey); - } - } - - if (id == null) - { - id = UUIDGenerator.generateBindingUUID(exchange.getName(), queue.getName(), bindingKey, _virtualHost.getName()); - } - BindingImpl b = new BindingImpl(id, bindingKey, queue, exchange, arguments); - BindingImpl existingMapping = _bindings.putIfAbsent(b, b); - if (existingMapping == null || force) - { - if (existingMapping != null) - { - removeBinding(existingMapping); - } - - if (b.isDurable() && !restore) - { - _virtualHost.getMessageStore().bindQueue(b); - } - - queue.addQueueDeleteTask(b); - exchange.addCloseTask(b); - queue.addBinding(b); - exchange.addBinding(b); - b.logCreation(); - - return true; - } - else - { - return false; - } - } - - public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException - { - makeBinding(id, bindingKey,queue,exchange,argumentMap,true, false); - } - - public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException - { - removeBinding(b.getBindingKey(), b.getQueue(), b.getExchange(), b.getArguments()); - } - - - public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException - { - assert queue != null; - final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); - - if (bindingKey == null) - { - bindingKey = ""; - } - if (exchange == null) - { - exchange = defaultExchange; - } - if (arguments == null) - { - arguments = Collections.emptyMap(); - } - - // The default exchange bindings must reflect the existence of queues, allow - // all operations on it to succeed. It is up to the broker to prevent illegal - // attempts at binding to this exchange, not the ACLs. - if(exchange != defaultExchange) - { - // Check access - if (!_virtualHost.getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue)) - { - throw new AMQSecurityException("Permission denied: unbinding " + bindingKey); - } - } - - BindingImpl b = _bindings.remove(new BindingImpl(null, bindingKey,queue,exchange,arguments)); - - if (b != null) - { - exchange.removeBinding(b); - queue.removeBinding(b); - exchange.removeCloseTask(b); - queue.removeQueueDeleteTask(b); - - if (b.isDurable()) - { - _virtualHost.getMessageStore().unbindQueue(b); - } - b.logDestruction(); - } - - return b; - } - - public Binding getBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) - { - assert queue != null; - final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); - - if(bindingKey == null) - { - bindingKey = ""; - } - if(exchange == null) - { - exchange = defaultExchange; - } - if(arguments == null) - { - arguments = Collections.emptyMap(); - } - - BindingImpl b = new BindingImpl(null, bindingKey,queue,exchange,arguments); - return _bindings.get(b); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 7a3367d215..0d05307cb4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -23,13 +23,18 @@ package org.apache.qpid.server.exchange; import java.util.ArrayList; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; +import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -60,7 +65,7 @@ public abstract class AbstractExchange implements Exchange private VirtualHost _virtualHost; - private final List<Exchange.Task> _closeTaskList = new CopyOnWriteArrayList<Exchange.Task>(); + private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>(); /** * Whether the exchange is automatically deleted once all queues have detached from it @@ -228,7 +233,7 @@ public abstract class AbstractExchange implements Exchange _closeTaskList.remove(task); } - public final void addBinding(final Binding binding) + public final void doAddBinding(final Binding binding) { _bindings.add(binding); int bindingCountSize = _bindings.size(); @@ -249,7 +254,7 @@ public abstract class AbstractExchange implements Exchange return _bindingCountHigh.get(); } - public final void removeBinding(final Binding binding) + public final void doRemoveBinding(final Binding binding) { onUnbind(binding); for(BindingListener listener : _listeners) @@ -380,4 +385,220 @@ public abstract class AbstractExchange implements Exchange { _listeners.remove(listener); } + + @Override + public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException + { + return makeBinding(null, bindingKey, queue, arguments, false, false); + } + + @Override + public boolean replaceBinding(final UUID id, final String bindingKey, + final AMQQueue queue, + final Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException + { + return makeBinding(id, bindingKey, queue, arguments, false, true); + } + + @Override + public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, + final Map<String, Object> argumentMap) + throws AMQSecurityException, AMQInternalException + { + makeBinding(id, bindingKey,queue, argumentMap,true, false); + } + + @Override + public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException + { + removeBinding(b.getBindingKey(), b.getQueue(), b.getArguments()); + } + + @Override + public Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException + { + assert queue != null; + + if (bindingKey == null) + { + bindingKey = ""; + } + if (arguments == null) + { + arguments = Collections.emptyMap(); + } + + // The default exchange bindings must reflect the existence of queues, allow + // all operations on it to succeed. It is up to the broker to prevent illegal + // attempts at binding to this exchange, not the ACLs. + // Check access + if (!_virtualHost.getSecurityManager().authoriseUnbind(this, new AMQShortString(bindingKey), queue)) + { + throw new AMQSecurityException("Permission denied: unbinding " + bindingKey); + } + + BindingImpl b = _bindingsMap.remove(new BindingImpl(null, bindingKey,queue,arguments)); + + if (b != null) + { + doRemoveBinding(b); + queue.removeBinding(b); + removeCloseTask(b); + queue.removeQueueDeleteTask(b); + + if (b.isDurable()) + { + _virtualHost.getMessageStore().unbindQueue(b); + } + b.logDestruction(); + } + + return b; + } + + + @Override + public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + { + assert queue != null; + + if(bindingKey == null) + { + bindingKey = ""; + } + + if(arguments == null) + { + arguments = Collections.emptyMap(); + } + + BindingImpl b = new BindingImpl(null, bindingKey,queue,arguments); + return _bindingsMap.get(b); + } + + private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingImpl, BindingImpl>(); + + private boolean makeBinding(UUID id, + String bindingKey, + AMQQueue queue, + Map<String, Object> arguments, + boolean restore, + boolean force) throws AMQSecurityException, AMQInternalException + { + assert queue != null; + + if (bindingKey == null) + { + bindingKey = ""; + } + if (arguments == null) + { + arguments = Collections.emptyMap(); + } + + //Perform ACLs + if (!_virtualHost.getSecurityManager().authoriseBind(AbstractExchange.this, queue, new AMQShortString(bindingKey))) + { + throw new AMQSecurityException("Permission denied: binding " + bindingKey); + } + + if (id == null) + { + id = UUIDGenerator.generateBindingUUID(getName(), + queue.getName(), + bindingKey, + _virtualHost.getName()); + } + BindingImpl b = new BindingImpl(id, bindingKey, queue, arguments); + BindingImpl existingMapping = _bindingsMap.putIfAbsent(b, b); + if (existingMapping == null || force) + { + if (existingMapping != null) + { + removeBinding(existingMapping); + } + + if (b.isDurable() && !restore) + { + _virtualHost.getMessageStore().bindQueue(b); + } + + queue.addQueueDeleteTask(b); + addCloseTask(b); + queue.addBinding(b); + doAddBinding(b); + b.logCreation(); + + return true; + } + else + { + return false; + } + } + + private final class BindingImpl extends Binding implements AMQQueue.Task, Task + { + private final BindingLogSubject _logSubject; + //TODO : persist creation time + private long _createTime = System.currentTimeMillis(); + + private BindingImpl(UUID id, + String bindingKey, + final AMQQueue queue, + final Map<String, Object> arguments) + { + super(id, bindingKey, queue, AbstractExchange.this, arguments); + _logSubject = new BindingLogSubject(bindingKey,AbstractExchange.this,queue); + + } + + + public void doTask(final AMQQueue queue) throws AMQException + { + removeBinding(this); + } + + public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException + { + removeBinding(this); + } + + void logCreation() + { + CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), + getArguments() != null + && !getArguments().isEmpty())); + } + + void logDestruction() + { + CurrentActor.get().message(_logSubject, BindingMessages.DELETED()); + } + + public String getOrigin() + { + return (String) getArguments().get("qpid.fed.origin"); + } + + public long getCreateTime() + { + return _createTime; + } + + public boolean isDurable() + { + return getQueue().isDurable() && getExchange().isDurable(); + } + + } + + public static interface Task + { + public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java new file mode 100644 index 0000000000..4e136965a1 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.exchange; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ExchangeMessages; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class DefaultExchange implements Exchange +{ + + private UUID _id; + private VirtualHost _virtualHost; + private int _ticket; + private static final Logger _logger = Logger.getLogger(DefaultExchange.class); + private final AtomicBoolean _closed = new AtomicBoolean(); + + private LogSubject _logSubject; + private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>(); + + + @Override + public void initialise(UUID id, + VirtualHost host, + AMQShortString name, + boolean durable, + int ticket, + boolean autoDelete) throws AMQException + { + _id = id; + _virtualHost = host; + _ticket = ticket; + } + + @Override + public String getName() + { + return ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(); + } + + @Override + public ExchangeType getType() + { + return DirectExchange.TYPE; + } + + @Override + public long getBindingCount() + { + return _virtualHost.getQueueRegistry().getQueues().size(); + } + + @Override + public long getByteDrops() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public long getByteReceives() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public long getMsgDrops() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public long getMsgReceives() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException + { + throw new AMQSecurityException("Cannot add bindings to the default exchange"); + } + + @Override + public boolean replaceBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException + { + throw new AMQSecurityException("Cannot replace bindings on the default exchange"); + } + + @Override + public void restoreBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> argumentMap) + throws AMQSecurityException, AMQInternalException + { + _logger.warn("Bindings to the default exchange should not be stored in the configuration store"); + } + + @Override + public void removeBinding(Binding b) throws AMQSecurityException, AMQInternalException + { + throw new AMQSecurityException("Cannot remove bindings to the default exchange"); + } + + @Override + public Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException + { + throw new AMQSecurityException("Cannot remove bindings to the default exchange"); + } + + @Override + public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + { + if(_virtualHost.getQueueRegistry().getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty())) + { + return convertToBinding(queue); + } + else + { + return null; + } + + } + + private Binding convertToBinding(AMQQueue queue) + { + String queueName = queue.getName(); + + UUID exchangeId = UUIDGenerator.generateBindingUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), + queueName, + queueName, + _virtualHost.getName()); + + return new Binding(exchangeId, queueName, queue, this, Collections.EMPTY_MAP); + } + + @Override + public AMQShortString getNameShortString() + { + return AMQShortString.EMPTY_STRING; + } + + @Override + public AMQShortString getTypeShortString() + { + return getType().getName(); + } + + @Override + public boolean isDurable() + { + return false; + } + + @Override + public boolean isAutoDelete() + { + return false; + } + + @Override + public int getTicket() + { + return _ticket; + } + + @Override + public void close() throws AMQException + { + if(_closed.compareAndSet(false,true)) + { + + CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED()); + + } + } + + @Override + public List<AMQQueue> route(InboundMessage message) + { + AMQQueue q = _virtualHost.getQueueRegistry().getQueue(message.getRoutingKey()); + if(q == null) + { + List<AMQQueue> noQueues = Collections.emptyList(); + return noQueues; + } + else + { + return Collections.singletonList(q); + } + + } + + @Override + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) + { + return isBound(routingKey, queue) && (arguments == null || arguments.isEmpty()); + } + + @Override + public boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return isBound(routingKey) && isBound(queue) && queue.getNameShortString().equals(routingKey); //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public boolean isBound(AMQShortString routingKey) + { + return _virtualHost.getQueueRegistry().getQueue(routingKey) != null; + } + + @Override + public boolean isBound(AMQQueue queue) + { + return _virtualHost.getQueueRegistry().getQueue(queue.getName()) == queue; + } + + @Override + public boolean hasBindings() + { + return getBindingCount() != 0; + } + + @Override + public boolean isBound(String bindingKey, AMQQueue queue) + { + return isBound(queue) && queue.getName().equals(bindingKey); + } + + @Override + public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) + { + return isBound(bindingKey, queue) && (arguments == null || arguments.isEmpty()); + } + + @Override + public boolean isBound(String bindingKey) + { + return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null; + } + + @Override + public Exchange getAlternateExchange() + { + return null; + } + + @Override + public void setAlternateExchange(Exchange exchange) + { + _logger.warn("Cannot set the alternate exchange for the default exchange"); + } + + @Override + public void removeReference(ExchangeReferrer exchange) + { + _referrers.remove(exchange); + } + + @Override + public void addReference(ExchangeReferrer exchange) + { + _referrers.put(exchange, Boolean.TRUE); + } + + @Override + public boolean hasReferrers() + { + return !_referrers.isEmpty(); + } + + @Override + public Collection<Binding> getBindings() + { + List<Binding> bindings = new ArrayList<Binding>(); + for(AMQQueue q : _virtualHost.getQueueRegistry().getQueues()) + { + bindings.add(convertToBinding(q)); + } + return bindings; + } + + @Override + public void addBindingListener(BindingListener listener) + { + _virtualHost.getQueueRegistry().addRegistryChangeListener(convertListener(listener));//To change body of implemented methods use File | Settings | File Templates. + } + + private QueueRegistry.RegistryChangeListener convertListener(final BindingListener listener) + { + return new QueueRegistry.RegistryChangeListener() + { + @Override + public void queueRegistered(AMQQueue queue) + { + listener.bindingAdded(DefaultExchange.this, convertToBinding(queue)); + } + + @Override + public void queueUnregistered(AMQQueue queue) + { + listener.bindingRemoved(DefaultExchange.this, convertToBinding(queue)); + } + }; + } + + @Override + public void removeBindingListener(BindingListener listener) + { + // TODO + } + + @Override + public UUID getId() + { + return _id; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 9cce8d640b..450e74bfec 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -54,14 +55,21 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public DefaultExchangeRegistry(VirtualHost host) { - //create 'standard' exchanges: _host = host; - } public void initialise() throws AMQException { + //create 'standard' exchanges: new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore()); + + _defaultExchange = new DefaultExchange(); + + UUID defaultExchangeId = + UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName()); + + _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false,0,false); + } public DurableConfigurationStore getDurableConfigurationStore() @@ -106,11 +114,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry throw new AMQException(AMQConstant.NOT_FOUND, "Unknown exchange " + name, null); } - if (ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name)) - { - throw new AMQException(AMQConstant.NOT_ALLOWED, "Cannot unregister the default exchange", null); - } - if (!_host.getSecurityManager().authoriseDelete(exchange)) { throw new AMQSecurityException(); @@ -228,7 +231,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public boolean isReservedExchangeName(String name) { - if (name == null || "".equals(name) || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(name) + if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(name) || name.startsWith("amq.") || name.startsWith("qpid.")) { return true; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 4bafb04c33..a5a1d7f912 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.JMException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -40,11 +39,33 @@ import java.util.UUID; public interface Exchange extends ExchangeReferrer { + void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + throws AMQException; + + + UUID getId(); String getName(); + AMQShortString getNameShortString(); + ExchangeType getType(); + AMQShortString getTypeShortString(); + + boolean isDurable(); + + /** + * @return true if the exchange will be deleted after all queues have been detached + */ + boolean isAutoDelete(); + + int getTicket(); + + Exchange getAlternateExchange(); + + void setAlternateExchange(Exchange exchange); + long getBindingCount(); long getByteDrops(); @@ -55,27 +76,25 @@ public interface Exchange extends ExchangeReferrer long getMsgReceives(); - public interface BindingListener - { - void bindingAdded(Exchange exchange, Binding binding); - void bindingRemoved(Exchange exchange, Binding binding); - } - AMQShortString getNameShortString(); + boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException; - AMQShortString getTypeShortString(); + boolean replaceBinding(UUID id, String bindingKey, + AMQQueue queue, + Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException; - void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) - throws AMQException, JMException; + void restoreBinding(UUID id, String bindingKey, AMQQueue queue, + Map<String, Object> argumentMap) + throws AMQSecurityException, AMQInternalException; - boolean isDurable(); + void removeBinding(Binding b) throws AMQSecurityException, AMQInternalException; - /** - * @return true if the exchange will be deleted after all queues have been detached - */ - boolean isAutoDelete(); + Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + throws AMQSecurityException, AMQInternalException; - int getTicket(); + Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments); void close() throws AMQException; @@ -124,6 +143,8 @@ public interface Exchange extends ExchangeReferrer */ boolean hasBindings(); + Collection<Binding> getBindings(); + boolean isBound(String bindingKey, AMQQueue queue); @@ -131,36 +152,20 @@ public interface Exchange extends ExchangeReferrer boolean isBound(String bindingKey); - void addCloseTask(Task task); - - void removeCloseTask(Task task); - - - Exchange getAlternateExchange(); - - void setAlternateExchange(Exchange exchange); - void removeReference(ExchangeReferrer exchange); void addReference(ExchangeReferrer exchange); boolean hasReferrers(); - void addBinding(Binding binding); - - void removeBinding(Binding binding); - - Collection<Binding> getBindings(); + public interface BindingListener + { + void bindingAdded(Exchange exchange, Binding binding); + void bindingRemoved(Exchange exchange, Binding binding); + } public void addBindingListener(BindingListener listener); public void removeBindingListener(BindingListener listener); - - public static interface Task - { - public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException; - } - - UUID getId(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java index edb476f3aa..313b5eefff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java @@ -21,9 +21,11 @@ package org.apache.qpid.server.exchange; +import java.util.UUID; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -36,8 +38,6 @@ public class ExchangeInitialiser define (registry, factory, type.getDefaultExchangeName(), type.getName(), store); } - define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, store); - registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME)); } private void define(ExchangeRegistry r, ExchangeFactory f, diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 49ca934966..0501443efa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -127,14 +127,14 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> String bindingKey = String.valueOf(routingKey); Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments()); - if(!virtualHost.getBindingFactory().addBinding(bindingKey, queue, exch, arguments)) + if(!exch.addBinding(bindingKey, queue, arguments)) { - Binding oldBinding = virtualHost.getBindingFactory().getBinding(bindingKey, queue, exch, arguments); + Binding oldBinding = exch.getBinding(bindingKey, queue, arguments); Map<String, Object> oldArgs = oldBinding.getArguments(); if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments))) { - virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, queue, exch, arguments); + exch.replaceBinding(oldBinding.getId(), bindingKey, queue, arguments); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 194c3d6351..c889f5660d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -145,11 +145,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar }); } } - Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); - - virtualHost.getBindingFactory().addBinding(String.valueOf(queueName), queue, defaultExchange, - Collections.<String, Object> emptyMap()); - _logger.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getNameShortString() + ")"); } } else if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java index 66a6ff6527..523c7acd88 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java @@ -103,13 +103,13 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); } - if(virtualHost.getBindingFactory().getBinding(String.valueOf(routingKey), queue, exch, FieldTable.convertToMap(body.getArguments())) == null) + if(exch.getBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments())) == null) { throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding"); } else { - virtualHost.getBindingFactory().removeBinding(String.valueOf(routingKey), queue, exch, FieldTable.convertToMap(body.getArguments())); + exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments())); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java index 39e979174a..92b8f55f23 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java @@ -142,7 +142,7 @@ final class BindingAdapter extends AbstractAdapter implements Binding { try { - _queue.getAMQQueue().getVirtualHost().getBindingFactory().removeBinding(_binding); + _exchange.getExchange().removeBinding(_binding); } catch(AMQSecurityException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index a081f03f09..5d3d507fff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -134,18 +134,17 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa try { - if(!virtualHost.getBindingFactory().addBinding(bindingKey, amqQueue, _exchange, bindingArguments)) + if(!_exchange.addBinding(bindingKey, amqQueue, bindingArguments)) { - Binding oldBinding = virtualHost.getBindingFactory().getBinding(bindingKey, amqQueue, _exchange, - bindingArguments); + Binding oldBinding = _exchange.getBinding(bindingKey, amqQueue, bindingArguments); Map<String, Object> oldArgs = oldBinding.getArguments(); if((oldArgs == null && !bindingArguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(bindingArguments))) { - virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, amqQueue, _exchange, bindingArguments); + _exchange.replaceBinding(oldBinding.getId(), bindingKey, amqQueue, bindingArguments); } } - Binding binding = virtualHost.getBindingFactory().getBinding(bindingKey, amqQueue, _exchange, bindingArguments); + Binding binding = _exchange.getBinding(bindingKey, amqQueue, bindingArguments); synchronized (_bindingAdapters) { @@ -311,8 +310,12 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa } if(adapter != null) { - _vhost.getQueueAdapter(binding.getQueue()).bindingUnregistered(binding); - childRemoved(adapter); + QueueAdapter queueAdapter = _vhost.getQueueAdapter(binding.getQueue()); + if(queueAdapter != null) + { + queueAdapter.bindingUnregistered(binding); + childRemoved(adapter); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index df171d61f6..b5c34a9f8b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -396,7 +396,6 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, durable, owner, lifetime == LifetimePolicy.AUTO_DELETE, exclusive, _virtualHost, attributes); - _virtualHost.getBindingFactory().addBinding(name, queue, _virtualHost.getExchangeRegistry().getDefaultExchange(), null); if(durable) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 589c3a0892..a72671762c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -296,7 +296,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - vhost.getBindingFactory().addBinding(binding,queue,exchange,null); + exchange.addBinding(binding,queue,null); source.setDistributionMode(StdDistMode.COPY); if(!isDurable) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index a65a6a8eb2..872a936462 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -329,7 +329,7 @@ public class AMQQueueFactory { //actual routing key used does not matter due to use of fanout exchange, //but we will make the key 'dlq' as it can be logged at creation. - virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null); + dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); } q.setAlternateExchange(dlExchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b16af05883..980145a83e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1321,9 +1321,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (!_deleted.getAndSet(true)) { - for (Binding b : getBindings()) + for (Binding b : _bindings) { - _virtualHost.getBindingFactory().removeBinding(b); + b.getExchange().removeBinding(b); } SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 7a6c0c42be..d8d245e255 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1020,7 +1020,7 @@ public class ServerSessionDelegate extends SessionDelegate { try { - virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments()); + exchange.addBinding(method.getBindingKey(), queue, method.getArguments()); } catch (AMQException e) { @@ -1075,7 +1075,7 @@ public class ServerSessionDelegate extends SessionDelegate { try { - virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null); + exchange.removeBinding(method.getBindingKey(), queue, null); } catch (AMQException e) { @@ -1266,12 +1266,6 @@ public class ServerSessionDelegate extends SessionDelegate } queueRegistry.registerQueue(queue); - ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); - - Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); - - virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null); - if (method.hasAutoDelete() && method.getAutoDelete() && method.hasExclusive() diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index d24f79c56c..eb1481b719 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,18 +20,15 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledFuture; import org.apache.qpid.common.Closeable; -import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.v1_0.LinkRegistry; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -74,8 +71,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable VirtualHostRegistry getVirtualHostRegistry(); - BindingFactory getBindingFactory(); - DtxRegistry getDtxRegistry(); LinkRegistry getLinkRegistry(String remoteContainerId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index ae88e3e9f7..640e6ff459 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -33,13 +33,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageTransferMessage; @@ -382,17 +380,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } } - BindingFactory bf = _virtualHost.getBindingFactory(); - Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT); - if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null) + if(exchange.getBinding(bindingKey, queue, argumentMap) == null) { _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName() + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"); - bf.restoreBinding(bindingId, bindingKey, queue, exchange, argumentMap); + exchange.restoreBinding(bindingId, bindingKey, queue, argumentMap); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index b39f975c28..8559a0263a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -33,7 +33,6 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -96,8 +95,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private final ConnectionRegistry _connectionRegistry; - private final BindingFactory _bindingFactory; - private final DtxRegistry _dtxRegistry; private final MessageStore _messageStore; @@ -144,8 +141,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr _exchangeRegistry = new DefaultExchangeRegistry(this); - _bindingFactory = new BindingFactory(this); - _messageStore = initialiseMessageStore(hostConfig); configureMessageStore(hostConfig); @@ -385,13 +380,18 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr { String routingKey = String.valueOf(routingKeyNameObj); - if (exchange.equals(defaultExchange) && !queueName.equals(routingKey)) + if (exchange.equals(defaultExchange)) { - throw new ConfigurationException("Illegal attempt to bind queue '" + queueName + - "' to the default exchange with a key other than the queue name: " + routingKey); + if(!queueName.equals(routingKey)) + { + throw new ConfigurationException("Illegal attempt to bind queue '" + queueName + + "' to the default exchange with a key other than the queue name: " + routingKey); + } + } + else + { + configureBinding(queue, exchange, routingKey); } - - configureBinding(queue, exchange, routingKey); } if (!exchange.equals(defaultExchange)) @@ -400,8 +400,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr configureBinding(queue, exchange, queueName); } - //ensure the queue is bound to the default exchange using its name - configureBinding(queue, defaultExchange, queueName); } private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws AMQException @@ -410,7 +408,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr { _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName()); } - _bindingFactory.addBinding(routingKey, queue, exchange, null); + exchange.addBinding(routingKey, queue, null); } public String getName() @@ -479,11 +477,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return _virtualHostRegistry; } - public BindingFactory getBindingFactory() - { - return _bindingFactory; - } - public void registerMessageDelivered(long messageSize) { _messagesDelivered.registerEvent(1L); diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java index 4d6d60906d..26fc51c400 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java @@ -25,7 +25,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.store.MessageStore; @@ -53,8 +52,6 @@ public class AcknowledgeTest extends QpidTestCase _queueName = getTestName(); _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); _messageStore = virtualHost.getMessageStore(); - Exchange defaultExchange = virtualHost.getExchangeRegistry().getDefaultExchange(); - virtualHost.getBindingFactory().addBinding(_queueName, _queue, defaultExchange, null); } @Override diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 121a8764ec..2ddb417d5d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -20,24 +20,39 @@ */ package org.apache.qpid.server.exchange; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.UUID; - import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHost; public class FanoutExchangeTest extends TestCase { private FanoutExchange _exchange; + private VirtualHost _virtualHost; - public void setUp() + public void setUp() throws AMQException { + CurrentActor.setDefault(mock(LogActor.class)); + _exchange = new FanoutExchange(); + _virtualHost = mock(VirtualHost.class); + SecurityManager securityManager = mock(SecurityManager.class); + when(_virtualHost.getSecurityManager()).thenReturn(securityManager); + when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true); + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); } public void testIsBoundAMQShortStringFieldTableAMQQueueWhenQueueIsNull() @@ -57,31 +72,32 @@ public class FanoutExchangeTest extends TestCase assertFalse("calling isBound(AMQQueue) with null queue should return false", _exchange.isBound((AMQQueue) null)); } - public void testIsBoundAMQShortStringFieldTableAMQQueue() + public void testIsBoundAMQShortStringFieldTableAMQQueue() throws AMQSecurityException, AMQInternalException { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", _exchange.isBound((AMQShortString) null, (FieldTable) null, queue)); } - public void testIsBoundAMQShortStringAMQQueue() + public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", _exchange.isBound((AMQShortString) null, queue)); } - public void testIsBoundAMQQueue() + public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", _exchange.isBound(queue)); } - private AMQQueue bindQueue() + private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException { AMQQueue queue = mock(AMQQueue.class); - _exchange.addBinding(new Binding(UUID.randomUUID(), "does not matter", queue, _exchange, null)); + when(queue.getVirtualHost()).thenReturn(_virtualHost); + _exchange.addBinding("does not matter", queue, null); return queue; } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java index dd8d28e836..a33c85dfd1 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java @@ -45,7 +45,7 @@ public class BindingLogSubjectTest extends AbstractTestLogSubject _testVhost = BrokerTestHelper.createVirtualHost("test"); _routingKey = new AMQShortString("RoutingKey"); - _exchange = _testVhost.getExchangeRegistry().getDefaultExchange(); + _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct"); _queue = new MockAMQQueue("BindingLogSubjectTest"); ((MockAMQQueue) _queue).setVirtualHost(_testVhost); diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java index 8d1b89bf3c..775a306bd3 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java @@ -40,7 +40,7 @@ public class ExchangeLogSubjectTest extends AbstractTestLogSubject _testVhost = BrokerTestHelper.createVirtualHost("test"); - _exchange = _testVhost.getExchangeRegistry().getDefaultExchange(); + _exchange = _testVhost.getExchangeRegistry().getExchange("amq.direct"); _subject = new ExchangeLogSubject(_exchange,_testVhost); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index ece42f7de3..505c47a69b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; @@ -164,7 +165,7 @@ public class SimpleAMQQueueTest extends QpidTestCase public void testBinding() throws AMQSecurityException, AMQInternalException { - _virtualHost.getBindingFactory().addBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP); + _exchange.addBinding(String.valueOf(_routingKey), _queue, Collections.EMPTY_MAP); assertTrue("Routing key was not bound", _exchange.isBound(_routingKey)); @@ -177,7 +178,7 @@ public class SimpleAMQQueueTest extends QpidTestCase assertEquals("Wrong exchange bound", _exchange, _queue.getBindings().get(0).getExchange()); - _virtualHost.getBindingFactory().removeBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP); + _exchange.removeBinding(String.valueOf(_routingKey), _queue, Collections.EMPTY_MAP); assertFalse("Routing key was still bound", _exchange.isBound(_routingKey)); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index ffd777243b..809ae72b89 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -829,7 +829,7 @@ public class MessageStoreTest extends QpidTestCase try { - getVirtualHost().getBindingFactory().addBinding(String.valueOf(routingKey), queue, exchange, FieldTable.convertToMap(bindArguments)); + exchange.addBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments)); } catch (Exception e) { @@ -849,7 +849,7 @@ public class MessageStoreTest extends QpidTestCase try { - getVirtualHost().getBindingFactory().removeBinding(String.valueOf(routingKey), queue, exchange, FieldTable.convertToMap(bindArguments)); + exchange.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments)); } catch (Exception e) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java index d35a90e3c8..25b86eb73f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java @@ -55,8 +55,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase _queueName = getTestName(); _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); _messageStore = virtualHost.getMessageStore(); - Exchange defaultExchange = virtualHost.getExchangeRegistry().getDefaultExchange(); - virtualHost.getBindingFactory().addBinding(_queueName, _queue, defaultExchange, null); } @Override diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 1d99d99820..324e36e132 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -21,9 +21,9 @@ package org.apache.qpid.server.virtualhost; import java.util.concurrent.ScheduledFuture; -import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.AbstractExchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.v1_0.LinkRegistry; @@ -61,11 +61,6 @@ public class MockVirtualHost implements VirtualHost return null; } - public BindingFactory getBindingFactory() - { - return null; - } - public DtxRegistry getDtxRegistry() { return null; diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index 975ec4daca..c9bf67c11d 100644 --- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -41,7 +41,7 @@ public class ExchangeDefaults } /** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */ - public static final AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>"); + public static final AMQShortString DEFAULT_EXCHANGE_NAME = AMQShortString.EMPTY_STRING; /** The pre-defined topic exchange, the broker SHOULD provide this. */ public static final AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic"); diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java index 2c7288de14..dfd26b474a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java @@ -106,19 +106,13 @@ public class BindingLoggingTest extends AbstractTestLogging List<String> results = waitAndFindMatches(BND_PREFIX); - // We will have two binds as we bind all queues to the default exchange - assertEquals("Result set larger than expected.", 2, results.size()); + assertEquals("Result set larger than expected.", 1, results.size()); - String exchange = "direct/<<default>>"; String messageID = "BND-1001"; - String message = "Create"; String queueName = _queue.getQueueName(); - + String exchange = "direct/amq.direct"; + String message = "Create : Arguments : {x-filter-jms-selector=}"; validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName); - - exchange = "direct/amq.direct"; - message = "Create : Arguments : {x-filter-jms-selector=}"; - validateLogMessage(getLogMessage(results, 1), messageID, message, exchange, queueName, queueName); } /** @@ -145,23 +139,14 @@ public class BindingLoggingTest extends AbstractTestLogging List<String> results = waitAndFindMatches(BND_PREFIX); - // We will have two binds as we bind all queues to the default exchange - assertEquals("Result set larger than expected.", 2, results.size()); + assertEquals("Result set larger than expected.", 1, results.size()); - //Verify the first entry is the default binding String messageID = "BND-1001"; - String message = "Create"; - - validateLogMessage(getLogMessage(results, 0), messageID, message, - "direct/<<default>>", "clientid:" + getName(), "clientid:" + getName()); - //Default binding will be without the selector - assertTrue("JMSSelector identified in binding:"+message, !message.contains("jms-selector")); - - // Perform full testing on the second non default binding - message = getMessageString(fromMessage(getLogMessage(results, 1))); + // Perform full testing on the binding + String message = getMessageString(fromMessage(getLogMessage(results, 0))); - validateLogMessage(getLogMessage(results, 1), messageID, message, + validateLogMessage(getLogMessage(results, 0), messageID, message, "topic/amq.topic", "topic", "clientid:" + getName()); assertTrue("JMSSelector not identified in binding:"+message, message.contains("jms-selector")); @@ -204,7 +189,7 @@ public class BindingLoggingTest extends AbstractTestLogging List<String> results = waitAndFindMatches(BND_PREFIX); // We will have two binds as we bind all queues to the default exchange - assertEquals("Result not as expected." + results, 4, results.size()); + assertEquals("Result not as expected." + results, 2, results.size()); String messageID = "BND-1001"; @@ -214,49 +199,20 @@ public class BindingLoggingTest extends AbstractTestLogging validateMessageID(messageID, log); assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log))); - log = getLogMessage(results, 1); - validateMessageID(messageID, log); - assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log))); - - - String DEFAULT = "direct/<<default>>"; String DIRECT = "direct/amq.direct"; messageID = "BND-1002"; message = "Deleted"; - log = getLogMessage(results, 2); + log = getLogMessage(results, 1); validateMessageID(messageID, log); String subject = fromSubject(log); validateBindingDeleteArguments(subject, "/test"); - boolean defaultFirst = DEFAULT.equals(AbstractTestLogSubject.getSlice("ex", subject)); - boolean directFirst = DIRECT.equals(AbstractTestLogSubject.getSlice("ex", subject)); - assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log))); - log = getLogMessage(results, 3); - - validateMessageID(messageID, log); - - subject = fromSubject(log); - - validateBindingDeleteArguments(subject, "/test"); - - if (!defaultFirst) - { - assertEquals(DEFAULT, AbstractTestLogSubject.getSlice("ex", subject)); - assertTrue("First Exchange Log was not a direct exchange delete",directFirst); - } - else - { - assertEquals(DIRECT, AbstractTestLogSubject.getSlice("ex", subject)); - assertTrue("First Exchange Log was not a default exchange delete",defaultFirst); - } - - assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log))); } private void validateBindingDeleteArguments(String subject, String vhostName) diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java index 3f979bea27..f6b56f64ce 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java @@ -64,11 +64,7 @@ public class BrokerManagementTest extends QpidBrokerTestCase public void testCreateQueueAndDeletion() throws Exception { final String queueName = getTestQueueName(); - final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); - // Check that bind does not exist before queue creation - assertFalse("Binding to " + queueName + " should not exist in default exchange before queue creation", - defaultExchange.bindings().containsKey(new String[] {queueName})); _managedBroker.createNewQueue(queueName, "testowner", true); @@ -76,16 +72,11 @@ public class BrokerManagementTest extends QpidBrokerTestCase assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); - // Now verify that the default exchange has been bound. - assertTrue("Binding to " + queueName + " should exist in default exchange after queue creation", - defaultExchange.bindings().containsKey(new String[] {queueName})); // Now delete the queue _managedBroker.deleteQueue(queueName); - // Finally ensure that the binding has been removed. - assertFalse("Binding to " + queueName + " should not exist in default exchange after queue deletion", - defaultExchange.bindings().containsKey(new String[] {queueName})); + } /** @@ -105,24 +96,24 @@ public class BrokerManagementTest extends QpidBrokerTestCase /** * Tests that it is disallowed to unregister the default exchange. */ - public void testUnregisterOfDefaultExchangeDisallowed() throws Exception + public void testUnregisterOfAmqDirectExchangeDisallowed() throws Exception { - String defaultExchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(); + String amqDirectExchangeName = "amq.direct"; - ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName); - assertNotNull("Exchange should exist", defaultExchange); + ManagedExchange amqDirectExchange = _jmxUtils.getManagedExchange(amqDirectExchangeName); + assertNotNull("Exchange should exist", amqDirectExchange); try { - _managedBroker.unregisterExchange(defaultExchangeName); + _managedBroker.unregisterExchange(amqDirectExchangeName); fail("Exception not thrown"); } catch (UnsupportedOperationException e) { // PASS - assertEquals("'<<default>>' is a reserved exchange and can't be deleted", e.getMessage()); + assertEquals("'"+amqDirectExchangeName+"' is a reserved exchange and can't be deleted", e.getMessage()); } - defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName); - assertNotNull("Exchange should exist", defaultExchange); + amqDirectExchange = _jmxUtils.getManagedExchange(amqDirectExchangeName); + assertNotNull("Exchange should exist", amqDirectExchange); } } diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java index 69e81cf85d..4358b4b450 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java @@ -365,7 +365,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging List<String> results = waitAndFindMatches("BND-1001"); - assertEquals("Unexpected number of bindings logged", 2, results.size()); + assertEquals("Unexpected number of bindings logged", 1, results.size()); String log = getLogMessage(results, 0); @@ -392,7 +392,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging List<String> results = waitAndFindMatches("BND-1001"); - assertEquals("Unexpected number of bindings logged", 2, results.size()); + assertEquals("Unexpected number of bindings logged", 1, results.size()); String log = getLogMessage(results, 0); @@ -419,7 +419,7 @@ public class ManagementActorLoggingTest extends AbstractTestLogging List<String> results = waitAndFindMatches("BND-1001"); - assertEquals("Unexpected number of bindings logged", 2, results.size()); + assertEquals("Unexpected number of bindings logged", 1, results.size()); String log = getLogMessage(results, 0); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java index a113cd734e..666c78f070 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/BindingRestTest.java @@ -46,7 +46,7 @@ public class BindingRestTest extends QpidRestTestCase { List<Map<String, Object>> bindings = getRestTestHelper().getJsonAsList("/rest/binding/test"); assertNotNull("Bindings cannot be null", bindings); - assertEquals("Unexpected number of bindings", EXPECTED_QUEUES.length * 2, bindings.size()); + assertEquals("Unexpected number of bindings", EXPECTED_QUEUES.length, bindings.size()); for (String queueName : EXPECTED_QUEUES) { Map<String, Object> searchAttributes = new HashMap<String, Object>(); @@ -55,11 +55,6 @@ public class BindingRestTest extends QpidRestTestCase Map<String, Object> binding = getRestTestHelper().find(searchAttributes, bindings); Asserts.assertBinding(queueName, "amq.direct", binding); - - searchAttributes.put(Binding.EXCHANGE, "<<default>>"); - - binding = getRestTestHelper().find(searchAttributes, bindings); - Asserts.assertBinding(queueName, "<<default>>", binding); } } diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java index 743ba00cdd..57398ea929 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java @@ -41,7 +41,7 @@ public class QpidRestTestCase extends QpidBrokerTestCase public static final String[] EXPECTED_VIRTUALHOSTS = { TEST1_VIRTUALHOST, TEST2_VIRTUALHOST, TEST3_VIRTUALHOST}; public static final String[] EXPECTED_QUEUES = { "queue", "ping" }; - public static final String[] EXPECTED_EXCHANGES = { "amq.fanout", "amq.match", "amq.direct","amq.topic","<<default>>" }; + public static final String[] EXPECTED_EXCHANGES = { "amq.fanout", "amq.match", "amq.direct","amq.topic" }; private RestTestHelper _restTestHelper = new RestTestHelper(findFreePort()); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java index d6eae154cf..fec516bc2b 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java @@ -94,11 +94,9 @@ public class QueueRestTest extends QpidRestTestCase @SuppressWarnings("unchecked") List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS); assertNotNull("Queue bindings are not found", bindings); - assertEquals("Unexpected number of bindings", 2, bindings.size()); + assertEquals("Unexpected number of bindings", 1, bindings.size()); - Map<String, Object> defaultExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "<<default>>", bindings); Map<String, Object> directExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "amq.direct", bindings); - Asserts.assertBinding(name, "<<default>>", defaultExchangeBinding); Asserts.assertBinding(name, "amq.direct", directExchangeBinding); } } @@ -113,11 +111,9 @@ public class QueueRestTest extends QpidRestTestCase @SuppressWarnings("unchecked") List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS); assertNotNull("Queue bindings are not found", bindings); - assertEquals("Unexpected number of bindings", 2, bindings.size()); + assertEquals("Unexpected number of bindings", 1, bindings.size()); - Map<String, Object> defaultExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "<<default>>", bindings); Map<String, Object> directExchangeBinding = getRestTestHelper().find(Binding.EXCHANGE, "amq.direct", bindings); - Asserts.assertBinding(queueName, "<<default>>", defaultExchangeBinding); Asserts.assertBinding(queueName, "amq.direct", directExchangeBinding); @SuppressWarnings("unchecked") @@ -169,7 +165,7 @@ public class QueueRestTest extends QpidRestTestCase { String queueName = getTestQueueName(); String bindingName = queueName + 2; - String[] exchanges = { "amq.direct", "amq.fanout", "amq.topic", "amq.match", "<<default>>" }; + String[] exchanges = { "amq.direct", "amq.fanout", "amq.topic", "amq.match" }; for (int i = 0; i < exchanges.length; i++) { @@ -182,7 +178,7 @@ public class QueueRestTest extends QpidRestTestCase @SuppressWarnings("unchecked") List<Map<String, Object>> bindings = (List<Map<String, Object>>) queueDetails.get(QUEUE_ATTRIBUTE_BINDINGS); assertNotNull("Queue bindings are not found", bindings); - assertEquals("Unexpected number of bindings", exchanges.length + 2, bindings.size()); + assertEquals("Unexpected number of bindings", exchanges.length + 1, bindings.size()); Map<String, Object> searchAttributes = new HashMap<String, Object>(); searchAttributes.put(Binding.NAME, bindingName); @@ -244,7 +240,7 @@ public class QueueRestTest extends QpidRestTestCase statistics.get(Queue.CONSUMER_COUNT)); assertEquals("Unexpected queue statistics attribute " + Queue.CONSUMER_COUNT_WITH_CREDIT, 1, statistics.get(Queue.CONSUMER_COUNT_WITH_CREDIT)); - assertEquals("Unexpected queue statistics attribute " + Queue.BINDING_COUNT, 2, statistics.get(Queue.BINDING_COUNT)); + assertEquals("Unexpected queue statistics attribute " + Queue.BINDING_COUNT, 1, statistics.get(Queue.BINDING_COUNT)); assertEquals("Unexpected queue statistics attribute " + Queue.PERSISTENT_DEQUEUED_MESSAGES, DEQUEUED_MESSAGES, statistics.get(Queue.PERSISTENT_DEQUEUED_MESSAGES)); assertEquals("Unexpected queue statistics attribute " + Queue.TOTAL_DEQUEUED_MESSAGES, DEQUEUED_MESSAGES, diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java index cb4fd1ad5b..664b8fffa4 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/StructureRestTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.systest.rest; import java.util.List; import java.util.Map; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.model.Port; import org.apache.qpid.test.utils.TestBrokerConfiguration; @@ -79,7 +80,8 @@ public class StructureRestTest extends QpidRestTestCase Map<String, Object> exchange = getRestTestHelper().find("name", exchangeName, exchanges); assertNotNull("Exchange " + exchangeName + " is not found ", exchange); assertNode(exchange, exchangeName); - if ("amq.direct".equalsIgnoreCase(exchangeName) || "<<default>>".equalsIgnoreCase(exchangeName)) + if (ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString().equalsIgnoreCase(exchangeName) || + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equalsIgnoreCase(exchangeName)) { @SuppressWarnings("unchecked") List<Map<String, Object>> bindings = (List<Map<String, Object>>) exchange.get("bindings"); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index c65f8bbd08..1e49351323 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -84,7 +84,6 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertDurableExchange("amq.topic", "topic", getRestTestHelper().find(Exchange.NAME, "amq.topic", exchanges)); Asserts.assertDurableExchange("amq.direct", "direct", getRestTestHelper().find(Exchange.NAME, "amq.direct", exchanges)); Asserts.assertDurableExchange("amq.match", "headers", getRestTestHelper().find(Exchange.NAME, "amq.match", exchanges)); - Asserts.assertDurableExchange("<<default>>", "direct", getRestTestHelper().find(Exchange.NAME, "<<default>>", exchanges)); @SuppressWarnings("unchecked") List<Map<String, Object>> queues = (List<Map<String, Object>>) hostDetails.get(VIRTUALHOST_QUEUES_ATTRIBUTE); @@ -600,7 +599,6 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertDurableExchange("amq.topic", "topic", restTestHelper.find(Exchange.NAME, "amq.topic", exchanges)); Asserts.assertDurableExchange("amq.direct", "direct", restTestHelper.find(Exchange.NAME, "amq.direct", exchanges)); Asserts.assertDurableExchange("amq.match", "headers", restTestHelper.find(Exchange.NAME, "amq.match", exchanges)); - Asserts.assertDurableExchange("<<default>>", "direct", restTestHelper.find(Exchange.NAME, "<<default>>", exchanges)); assertNull("Unexpected queues", hostDetails.get(VIRTUALHOST_QUEUES_ATTRIBUTE)); assertNull("Unexpected connections", hostDetails.get(VIRTUALHOST_CONNECTIONS_ATTRIBUTE)); diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 22a98b6f42..3783b0bd02 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -108,7 +108,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), null)); // create receiver ----------------------------------------- addr1 = "ADDR:testQueue2; { create: receiver }"; @@ -133,7 +133,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), null)); // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; @@ -182,7 +182,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), null)); } |