diff options
author | Kim van der Riet <kpvdr@apache.org> | 2007-01-18 14:28:25 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-18 14:28:25 +0000 |
commit | cfe7469f1c49728d391587f2d4be7ce68085e01d (patch) | |
tree | fdd6a2617120ccd9191a47bd55a0d543ad469799 /java/cluster | |
parent | cdf7469e2688f9f52487b7968664ced2db560980 (diff) | |
download | qpid-python-cfe7469f1c49728d391587f2d4be7ce68085e01d.tar.gz |
Cleared all the cluster compile errors. This now opens the way to testing...
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497445 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster')
16 files changed, 102 insertions, 88 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java index e95cf3406a..2bde8f70e9 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java @@ -49,9 +49,9 @@ class ClientAdapter implements MethodHandler _stateMgr = stateMgr; } - public void handle(int channel, AMQMethodBody method) throws AMQException + public void handle(int channel, AMQMethodBody method, long requestId) throws AMQException { - AMQMethodEvent evt = new AMQMethodEvent(channel, method); + AMQMethodEvent evt = new AMQMethodEvent(channel, method, requestId); _stateMgr.methodReceived(evt); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java index 0c72dee984..7d9a50e354 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -128,9 +128,9 @@ public class ClientHandlerRegistry extends AMQStateManager class ConnectionTuneHandler extends ConnectionTuneMethodHandler { - protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) + protected AMQMethodBody createConnectionOpenMethodBody(String path, String capabilities, boolean insist) { - return super.createConnectionOpenFrame(channel, path, ClusterCapability.add(capabilities, _identity), insist); + return super.createConnectionOpenMethodBody(path, ClusterCapability.add(capabilities, _identity), insist); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index 1763bcd03f..01da4ab357 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -32,6 +32,8 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.AMQProtocolWriter; public class ClusteredProtocolSession extends AMQMinaProtocolSession { @@ -64,7 +66,7 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession AMQChannel channel = super.getChannel(channelId); if (isPeerSession() && channel == null) { - channel = new OneUseChannel(channelId); + channel = new OneUseChannel(channelId, this, getStateManager()); addChannel(channel); } return channel; @@ -100,25 +102,29 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession */ private class OneUseChannel extends AMQChannel { - public OneUseChannel(int channelId) + public OneUseChannel(int channelId, AMQProtocolWriter protocolWriter, + AMQMethodListener methodListener) throws AMQException { - this(channelId, ApplicationRegistry.getInstance()); + this(channelId, ApplicationRegistry.getInstance(), protocolWriter, methodListener); } - public OneUseChannel(int channelId, IApplicationRegistry registry) + public OneUseChannel(int channelId, IApplicationRegistry registry, + AMQProtocolWriter protocolWriter, AMQMethodListener methodListener) throws AMQException { super(channelId, registry.getMessageStore(), - registry.getExchangeRegistry()); + registry.getExchangeRegistry(), + protocolWriter, + methodListener); } - protected void routeCurrentMessage() throws AMQException - { - super.routeCurrentMessage(); - removeChannel(getChannelId()); - } +// protected void routeCurrentMessage() throws AMQException +// { +// super.routeCurrentMessage(); +// removeChannel(getChannelId()); +// } } public static boolean isPayloadFromPeer(AMQMessage payload) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java index a83f034021..a3309f84d0 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java @@ -25,5 +25,5 @@ import org.apache.qpid.framing.AMQMethodBody; interface MethodHandler { - public void handle(int channel, AMQMethodBody method) throws AMQException; + public void handle(int channel, AMQMethodBody method, long requestId) throws AMQException; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index 69fee079cf..011ce6bafb 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -37,6 +37,8 @@ import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.framing.ConnectionRedirectBody; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; @@ -166,7 +168,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } } - public void handle(int channel, AMQMethodBody method) throws AMQException + public void handle(int channel, AMQMethodBody method, long requestId) throws AMQException { _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel)); if (!handleResponse(channel, method)) @@ -175,7 +177,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } } - private void handleMethod(int channel, AMQMethodBody method) throws AMQException + private void handleMethod(int channel, AMQMethodBody method, long requestId) throws AMQException { if (method instanceof ConnectionRedirectBody) { @@ -186,7 +188,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } else { - _handler.handle(channel, method); + _handler.handle(channel, method, requestId); if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this) { _handler = this; @@ -202,9 +204,15 @@ public class MinaBrokerProxy extends Broker implements MethodHandler private void handleFrame(AMQFrame frame) throws AMQException { AMQBody body = frame.bodyFrame; - if (body instanceof AMQMethodBody) + if (body instanceof AMQRequestBody) { - handleMethod(frame.channel, (AMQMethodBody) body); + handleMethod(frame.channel, ((AMQRequestBody)body).getMethodPayload(), + ((AMQRequestBody)body).getRequestId()); + } + else if (body instanceof AMQResponseBody) + { + handleMethod(frame.channel, ((AMQResponseBody)body).getMethodPayload(), + ((AMQRequestBody)body).getRequestId()); } else { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index cab020b448..27d5629f27 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -44,20 +44,20 @@ class ServerHandlerRegistry extends AMQStateManager private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, -+ AMQProtocolSession protocolSession) + AMQProtocolSession protocolSession) { super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession); } ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry, -+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { this(queueRegistry, exchangeRegistry, protocolSession); _handlers.putAll(s._handlers); } ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry, -+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { this(queueRegistry, exchangeRegistry, protocolSession); init(factory); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java index 8e6d133600..9612efbedf 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java @@ -32,7 +32,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.MessageConsumeBody; import org.apache.log4j.Logger; import java.util.Map; @@ -63,9 +63,9 @@ class ChannelQueueManager return new QueueBindHandler(); } - ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler() + ClusterMethodHandler<MessageConsumeBody> createBasicConsumeHandler() { - return new BasicConsumeHandler(); + return new MessageConsumeHandler(); } private void set(int channel, String queue) @@ -121,13 +121,13 @@ class ChannelQueueManager } } - private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody> + private class MessageConsumeHandler extends ClusterMethodHandler<MessageConsumeBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { if(evt.getMethod().queue == null) { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java index 97174d782c..6aaa66dbfa 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -23,9 +23,9 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelFlowBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -45,7 +45,7 @@ import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.ClusterSynchBody; -import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.MessageQosBody; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxRollbackBody; @@ -69,11 +69,11 @@ import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler; import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler; import org.apache.qpid.server.handler.ExchangeDeclareHandler; import org.apache.qpid.server.handler.ExchangeDeleteHandler; -import org.apache.qpid.server.handler.BasicCancelMethodHandler; -import org.apache.qpid.server.handler.BasicPublishMethodHandler; +import org.apache.qpid.server.handler.MessageCancelHandler; +import org.apache.qpid.server.handler.MessageTransferHandler; import org.apache.qpid.server.handler.QueueBindHandler; import org.apache.qpid.server.handler.QueueDeleteHandler; -import org.apache.qpid.server.handler.BasicQosHandler; +import org.apache.qpid.server.handler.MessageQosHandler; import org.apache.qpid.server.handler.TxSelectHandler; import org.apache.qpid.server.handler.TxCommitHandler; import org.apache.qpid.server.handler.TxRollbackHandler; @@ -141,14 +141,14 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory registry.addHandler(QueueBindBody.class, chain(channelQueueMgr.createQueueBindHandler(), replicated(QueueBindHandler.getInstance()))); registry.addHandler(QueueDeleteBody.class, chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new QueueDeleteHandler(false), new QueueDeleteHandler(true))))); - registry.addHandler(BasicConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr))); + registry.addHandler(MessageConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr))); //other modified handlers: - registry.addHandler(BasicCancelBody.class, alternate(new RemoteCancelHandler(), BasicCancelMethodHandler.getInstance())); + registry.addHandler(MessageCancelBody.class, alternate(new RemoteCancelHandler(), MessageCancelHandler.getInstance())); //other unaffected handlers: - registry.addHandler(BasicPublishBody.class, BasicPublishMethodHandler.getInstance()); - registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance()); + registry.addHandler(MessageTransferBody.class, MessageTransferHandler.getInstance()); + registry.addHandler(MessageQosBody.class, MessageQosHandler.getInstance()); registry.addHandler(ChannelOpenBody.class, ChannelOpenHandler.getInstance()); registry.addHandler(ChannelCloseBody.class, ChannelCloseHandler.getInstance()); registry.addHandler(ChannelFlowBody.class, ChannelFlowHandler.getInstance()); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java index e9f039ed09..f17a329c44 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.cluster.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.MessageCancelBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -33,14 +33,14 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody> +public class RemoteCancelHandler implements StateAwareMethodListener<MessageCancelBody> { private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class); - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageCancelBody> evt) throws AMQException { //By convention, consumers setup between brokers use the queue name as the consumer tag: - AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag); + AMQQueue queue = queues.getQueue(evt.getMethod().getDestination()); if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index 693c70b780..a3b2b42bc9 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -38,11 +38,11 @@ import org.apache.qpid.server.state.StateAwareMethodListener; * Handles consume requests from other cluster members. * */ -public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsumeBody> +public class RemoteConsumeHandler implements StateAwareMethodListener<MessageConsumeBody> { private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class); - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { AMQQueue queue = queues.getQueue(evt.getMethod().queue); if (queue instanceof ClusteredQueue) @@ -51,10 +51,8 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) - evt.getMethod().queue // consumerTag - )); + session.writeResponse(evt.getChannelId(), evt.getRequestId(), + MessageOkBody.createMethodBody((byte)0, (byte)9)); } else { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java index ea42db3ded..5d25bab23e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java @@ -21,20 +21,20 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.MessageConsumeBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.BroadcastPolicy; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.handler.BasicConsumeMethodHandler; +import org.apache.qpid.server.handler.MessageConsumeHandler; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody> +public class ReplicatingConsumeHandler extends ReplicatingHandler<MessageConsumeBody> { ReplicatingConsumeHandler(GroupManager groupMgr) { @@ -46,7 +46,7 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBo super(groupMgr, base(), policy); } - protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { //only replicate if the queue in question is a shared queue if (isShared(queues.getQueue(evt.getMethod().queue))) @@ -67,18 +67,18 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBo return queue != null && queue.isShared(); } - static StateAwareMethodListener<BasicConsumeBody> base() + static StateAwareMethodListener<MessageConsumeBody> base() { - return new PeerHandler<BasicConsumeBody>(peer(), client()); + return new PeerHandler<MessageConsumeBody>(peer(), client()); } - static StateAwareMethodListener<BasicConsumeBody> peer() + static StateAwareMethodListener<MessageConsumeBody> peer() { return new RemoteConsumeHandler(); } - static StateAwareMethodListener<BasicConsumeBody> client() + static StateAwareMethodListener<MessageConsumeBody> client() { - return BasicConsumeMethodHandler.getInstance(); + return MessageConsumeHandler.getInstance(); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index a601021bf1..b4249d70aa 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.cluster.replay; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.MessageConsumeBody; import java.util.Map; import java.util.HashMap; @@ -36,7 +36,7 @@ class ConsumerCounts _counts.put(queue, get(queue) + 1); } - synchronized void decrement(String queue) + synchronized void decrement(String queue) { _counts.put(queue, get(queue) - 1); } @@ -53,14 +53,14 @@ class ConsumerCounts { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicConsumeBody m = new BasicConsumeBody((byte)0, (byte)9); + MessageConsumeBody m = new MessageConsumeBody((byte)0, (byte)9); m.queue = queue; - m.consumerTag = queue; + m.destination = queue; replay(m, messages); } } - private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages) + private void replay(MessageConsumeBody msg, List<AMQMethodBody> messages) { int count = _counts.get(msg.queue); for(int i = 0; i < count; i++) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index c927c0cebf..2675fbb1f4 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.replay; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageConsumeBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.QueueBindBody; @@ -55,8 +55,8 @@ public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)), new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)), new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)), - new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)), - new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor)) + new FrameDescriptor(MessageConsumeBody.class, new MessageConsumeBody(major, minor)), + new FrameDescriptor(MessageCancelBody.class, new MessageCancelBody(major, minor)) }); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index ac4879561c..102628c6de 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -29,8 +29,8 @@ import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.ClusterSynchBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageCancelBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.util.LogMessage; @@ -75,8 +75,8 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener _localRecorders.put(QueueDeclareBody.class, new PrivateQueueDeclareRecorder()); _localRecorders.put(QueueDeleteBody.class, new PrivateQueueDeleteRecorder()); _localRecorders.put(QueueBindBody.class, new PrivateQueueBindRecorder()); - _localRecorders.put(BasicConsumeBody.class, new BasicConsumeRecorder()); - _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder()); + _localRecorders.put(MessageConsumeBody.class, new BasicConsumeRecorder()); + _localRecorders.put(MessageCancelBody.class, new BasicCancelRecorder()); _localRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder()); _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); } @@ -130,9 +130,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener return methods; } - private class BasicConsumeRecorder implements MethodRecorder<BasicConsumeBody> + private class BasicConsumeRecorder implements MethodRecorder<MessageConsumeBody> { - public void record(BasicConsumeBody method) + public void record(MessageConsumeBody method) { if(_sharedQueues.containsKey(method.queue)) { @@ -141,13 +141,13 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener } } - private class BasicCancelRecorder implements MethodRecorder<BasicCancelBody> + private class BasicCancelRecorder implements MethodRecorder<MessageCancelBody> { - public void record(BasicCancelBody method) + public void record(MessageCancelBody method) { - if(_sharedQueues.containsKey(method.consumerTag)) + if(_sharedQueues.containsKey(method.getDestination())) { - _consumers.decrement(method.consumerTag); + _consumers.decrement(method.getDestination()); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index c5b4c1c4b5..3b9f95be46 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.MessageCancelBody; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.server.cluster.*; import org.apache.qpid.server.cluster.util.LogMessage; @@ -91,8 +91,8 @@ public class ClusteredQueue extends AMQQueue //signal other members: // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicCancelBody request = new BasicCancelBody((byte)0, (byte)9); - request.consumerTag = getName(); + MessageCancelBody request = new MessageCancelBody((byte)0, (byte)9); + request.destination = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java index 752cf05a82..a989e45708 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -23,9 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.Content; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.util.LogMessage; @@ -92,9 +91,11 @@ public class RemoteQueueProxy extends AMQQueue void relay(AMQMessage msg) throws AMQException { - BasicPublishBody publish = msg.getPublishBody(); + throw new Error("XXX"); + /* + MessageTransferBody publish = msg.getPublishBody(); ContentHeaderBody header = msg.getContentHeaderBody(); - List<ContentBody> bodies = msg.getContentBodies(); + List<Content> bodies = msg.getContentBodies(); //(i) construct a new publishing block: publish.immediate = false;//can't as yet handle the immediate flag in a cluster @@ -105,5 +106,6 @@ public class RemoteQueueProxy extends AMQQueue //(ii) send this on to the broker for which it is acting as proxy: _groupMgr.send(_target, new SimpleSendable(parts)); + */ } } |