diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-26 23:27:39 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-26 23:27:39 +0000 |
commit | 4ae118bb7a81155a9f3d22af3a4a3f2191799c83 (patch) | |
tree | 8bebb59700d6825f997cb501c9043513e0f785c8 /qpid/java/broker-plugins/amqp-0-8-protocol/src/main | |
parent | 6339e3b1ad22e74508510e08384c4d484bd9666c (diff) | |
download | qpid-python-4ae118bb7a81155a9f3d22af3a4a3f2191799c83.tar.gz |
QPID-5577 : [Java Broker] Change Exchange,Queue,Binding,Consumer to implement ConfiguredObject and remove adapter classes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1572343 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main')
8 files changed, 21 insertions, 34 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 453f3035e1..9e0c5b6be6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -32,6 +32,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; @@ -48,7 +49,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.filter.SimpleFilterManager; @@ -1267,14 +1267,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } - private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>> + private class ImmediateAction implements Action<MessageInstance> { public ImmediateAction() { } - public void performAction(MessageInstance<?,C> entry) + public void performAction(MessageInstance entry) { TransactionLogResource queue = entry.getOwningResource(); @@ -1332,10 +1332,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } - private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>> + private final class CapacityCheckAction implements Action<MessageInstance> { @Override - public void performAction(final MessageInstance<?,C> entry) + public void performAction(final MessageInstance entry) { TransactionLogResource queue = entry.getOwningResource(); if(queue instanceof CapacityChecker) @@ -1569,7 +1569,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { final AMQQueue queue = (AMQQueue) owningResource; - final Exchange altExchange = queue.getAlternateExchange(); + final ExchangeImpl altExchange = queue.getAlternateExchange(); if (altExchange == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 611999d8c6..f620abf30f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -210,7 +210,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB props, _channel.getChannelId(), deliveryTag, - _queue.getMessageCount()); + _queue.getQueueDepthMessages()); _deliveredMessage = true; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java index 4ebddb0f68..27837844ff 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java @@ -25,8 +25,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -82,7 +82,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQShortString exchangeName = body.getExchange() == null ? AMQShortString.EMPTY_STRING : body.getExchange(); AMQShortString queueName = body.getQueue(); AMQShortString routingKey = body.getRoutingKey(); - Exchange exchange = virtualHost.getExchange(exchangeName.toString()); + ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); ExchangeBoundOkBody response; if (exchange == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index 9446f53188..3b630c684c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -30,13 +30,12 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; @@ -77,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName); } - Exchange exchange; + ExchangeImpl exchange; if (body.getPassive()) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java index bbe6028a63..720677064b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java @@ -24,8 +24,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; @@ -62,7 +62,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD { final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); - final Exchange exchange = virtualHost.getExchange(exchangeName); + final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); if(exchange == null) { throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 7ac71babf3..1e0382f456 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -29,10 +29,9 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -103,7 +102,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); } final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); - final Exchange exch = virtualHost.getExchange(exchangeName); + final ExchangeImpl exch = virtualHost.getExchange(exchangeName); if (exch == null) { throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist."); @@ -121,13 +120,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if(!exch.addBinding(bindingKey, queue, arguments) && TopicExchange.TYPE.equals(exch.getExchangeType())) { - Binding oldBinding = exch.getBinding(bindingKey, queue); - - Map<String, Object> oldArgs = oldBinding.getArguments(); - if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments))) - { - exch.replaceBinding(oldBinding.getId(), bindingKey, queue, arguments); - } + exch.replaceBinding(bindingKey, queue, arguments); } } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 4a3bd1e921..97aac8424a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -163,7 +163,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, - queue.getMessageCount(), + queue.getQueueDepthMessages(), queue.getConsumerCount()); protocolConnection.writeFrame(responseBody.generateFrame(channelId)); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index f6dbd0cee0..a828ca323d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -30,9 +30,8 @@ import org.apache.qpid.framing.QueueUnbindBody; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -94,13 +93,13 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); } - final Exchange exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); + final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); if (exch == null) { throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); } - if(exch.getBinding(String.valueOf(routingKey), queue) == null) + if(!exch.hasBinding(String.valueOf(routingKey), queue)) { throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding"); } @@ -108,11 +107,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { try { - Binding binding = exch.getBinding(String.valueOf(routingKey), queue); - if(binding != null) - { - binding.delete(); - } + exch.deleteBinding(String.valueOf(routingKey), queue); } catch (AccessControlException e) { |