summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-8-protocol/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-26 23:27:39 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-26 23:27:39 +0000
commit4ae118bb7a81155a9f3d22af3a4a3f2191799c83 (patch)
tree8bebb59700d6825f997cb501c9043513e0f785c8 /qpid/java/broker-plugins/amqp-0-8-protocol/src/main
parent6339e3b1ad22e74508510e08384c4d484bd9666c (diff)
downloadqpid-python-4ae118bb7a81155a9f3d22af3a4a3f2191799c83.tar.gz
QPID-5577 : [Java Broker] Change Exchange,Queue,Binding,Consumer to implement ConfiguredObject and remove adapter classes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1572343 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java13
8 files changed, 21 insertions, 34 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 453f3035e1..9e0c5b6be6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -32,6 +32,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
@@ -48,7 +49,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.SimpleFilterManager;
@@ -1267,14 +1267,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
- private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
+ private class ImmediateAction implements Action<MessageInstance>
{
public ImmediateAction()
{
}
- public void performAction(MessageInstance<?,C> entry)
+ public void performAction(MessageInstance entry)
{
TransactionLogResource queue = entry.getOwningResource();
@@ -1332,10 +1332,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
}
- private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>>
+ private final class CapacityCheckAction implements Action<MessageInstance>
{
@Override
- public void performAction(final MessageInstance<?,C> entry)
+ public void performAction(final MessageInstance entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
@@ -1569,7 +1569,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
final AMQQueue queue = (AMQQueue) owningResource;
- final Exchange altExchange = queue.getAlternateExchange();
+ final ExchangeImpl altExchange = queue.getAlternateExchange();
if (altExchange == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 611999d8c6..f620abf30f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -210,7 +210,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
props,
_channel.getChannelId(),
deliveryTag,
- _queue.getMessageCount());
+ _queue.getQueueDepthMessages());
_deliveredMessage = true;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
index 4ebddb0f68..27837844ff 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
@@ -25,8 +25,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeBoundBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -82,7 +82,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
AMQShortString exchangeName = body.getExchange() == null ? AMQShortString.EMPTY_STRING : body.getExchange();
AMQShortString queueName = body.getQueue();
AMQShortString routingKey = body.getRoutingKey();
- Exchange exchange = virtualHost.getExchange(exchangeName.toString());
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
ExchangeBoundOkBody response;
if (exchange == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
index 9446f53188..3b630c684c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
@@ -30,13 +30,12 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
@@ -77,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
_logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
}
- Exchange exchange;
+ ExchangeImpl exchange;
if (body.getPassive())
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
index bbe6028a63..720677064b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
@@ -24,8 +24,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
@@ -62,7 +62,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
{
final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
- final Exchange exchange = virtualHost.getExchange(exchangeName);
+ final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
if(exchange == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
index 7ac71babf3..1e0382f456 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
@@ -29,10 +29,9 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -103,7 +102,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
}
final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
- final Exchange exch = virtualHost.getExchange(exchangeName);
+ final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
if (exch == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.");
@@ -121,13 +120,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if(!exch.addBinding(bindingKey, queue, arguments) && TopicExchange.TYPE.equals(exch.getExchangeType()))
{
- Binding oldBinding = exch.getBinding(bindingKey, queue);
-
- Map<String, Object> oldArgs = oldBinding.getArguments();
- if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments)))
- {
- exch.replaceBinding(oldBinding.getId(), bindingKey, queue, arguments);
- }
+ exch.replaceBinding(bindingKey, queue, arguments);
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 4a3bd1e921..97aac8424a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -163,7 +163,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
- queue.getMessageCount(),
+ queue.getQueueDepthMessages(),
queue.getConsumerCount());
protocolConnection.writeFrame(responseBody.generateFrame(channelId));
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
index f6dbd0cee0..a828ca323d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
@@ -30,9 +30,8 @@ import org.apache.qpid.framing.QueueUnbindBody;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -94,13 +93,13 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
+ final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
if (exch == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
}
- if(exch.getBinding(String.valueOf(routingKey), queue) == null)
+ if(!exch.hasBinding(String.valueOf(routingKey), queue))
{
throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding");
}
@@ -108,11 +107,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
try
{
- Binding binding = exch.getBinding(String.valueOf(routingKey), queue);
- if(binding != null)
- {
- binding.delete();
- }
+ exch.deleteBinding(String.valueOf(routingKey), queue);
}
catch (AccessControlException e)
{