summaryrefslogtreecommitdiff
path: root/java/cluster
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-18 14:28:25 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-18 14:28:25 +0000
commitcfe7469f1c49728d391587f2d4be7ce68085e01d (patch)
treefdd6a2617120ccd9191a47bd55a0d543ad469799 /java/cluster
parentcdf7469e2688f9f52487b7968664ced2db560980 (diff)
downloadqpid-python-cfe7469f1c49728d391587f2d4be7ce68085e01d.tar.gz
Cleared all the cluster compile errors. This now opens the way to testing...
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497445 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java26
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java18
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java22
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java18
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java20
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java12
16 files changed, 102 insertions, 88 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
index e95cf3406a..2bde8f70e9 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
@@ -49,9 +49,9 @@ class ClientAdapter implements MethodHandler
_stateMgr = stateMgr;
}
- public void handle(int channel, AMQMethodBody method) throws AMQException
+ public void handle(int channel, AMQMethodBody method, long requestId) throws AMQException
{
- AMQMethodEvent evt = new AMQMethodEvent(channel, method);
+ AMQMethodEvent evt = new AMQMethodEvent(channel, method, requestId);
_stateMgr.methodReceived(evt);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
index 0c72dee984..7d9a50e354 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
@@ -128,9 +128,9 @@ public class ClientHandlerRegistry extends AMQStateManager
class ConnectionTuneHandler extends ConnectionTuneMethodHandler
{
- protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
+ protected AMQMethodBody createConnectionOpenMethodBody(String path, String capabilities, boolean insist)
{
- return super.createConnectionOpenFrame(channel, path, ClusterCapability.add(capabilities, _identity), insist);
+ return super.createConnectionOpenMethodBody(path, ClusterCapability.add(capabilities, _identity), insist);
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
index 1763bcd03f..01da4ab357 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
@@ -32,6 +32,8 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQProtocolWriter;
public class ClusteredProtocolSession extends AMQMinaProtocolSession
{
@@ -64,7 +66,7 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
AMQChannel channel = super.getChannel(channelId);
if (isPeerSession() && channel == null)
{
- channel = new OneUseChannel(channelId);
+ channel = new OneUseChannel(channelId, this, getStateManager());
addChannel(channel);
}
return channel;
@@ -100,25 +102,29 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
*/
private class OneUseChannel extends AMQChannel
{
- public OneUseChannel(int channelId)
+ public OneUseChannel(int channelId, AMQProtocolWriter protocolWriter,
+ AMQMethodListener methodListener)
throws AMQException
{
- this(channelId, ApplicationRegistry.getInstance());
+ this(channelId, ApplicationRegistry.getInstance(), protocolWriter, methodListener);
}
- public OneUseChannel(int channelId, IApplicationRegistry registry)
+ public OneUseChannel(int channelId, IApplicationRegistry registry,
+ AMQProtocolWriter protocolWriter, AMQMethodListener methodListener)
throws AMQException
{
super(channelId,
registry.getMessageStore(),
- registry.getExchangeRegistry());
+ registry.getExchangeRegistry(),
+ protocolWriter,
+ methodListener);
}
- protected void routeCurrentMessage() throws AMQException
- {
- super.routeCurrentMessage();
- removeChannel(getChannelId());
- }
+// protected void routeCurrentMessage() throws AMQException
+// {
+// super.routeCurrentMessage();
+// removeChannel(getChannelId());
+// }
}
public static boolean isPayloadFromPeer(AMQMessage payload)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
index a83f034021..a3309f84d0 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
@@ -25,5 +25,5 @@ import org.apache.qpid.framing.AMQMethodBody;
interface MethodHandler
{
- public void handle(int channel, AMQMethodBody method) throws AMQException;
+ public void handle(int channel, AMQMethodBody method, long requestId) throws AMQException;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index 69fee079cf..011ce6bafb 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -37,6 +37,8 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.ConnectionRedirectBody;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
@@ -166,7 +168,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
}
- public void handle(int channel, AMQMethodBody method) throws AMQException
+ public void handle(int channel, AMQMethodBody method, long requestId) throws AMQException
{
_logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel));
if (!handleResponse(channel, method))
@@ -175,7 +177,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
}
- private void handleMethod(int channel, AMQMethodBody method) throws AMQException
+ private void handleMethod(int channel, AMQMethodBody method, long requestId) throws AMQException
{
if (method instanceof ConnectionRedirectBody)
{
@@ -186,7 +188,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
else
{
- _handler.handle(channel, method);
+ _handler.handle(channel, method, requestId);
if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this)
{
_handler = this;
@@ -202,9 +204,15 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
private void handleFrame(AMQFrame frame) throws AMQException
{
AMQBody body = frame.bodyFrame;
- if (body instanceof AMQMethodBody)
+ if (body instanceof AMQRequestBody)
{
- handleMethod(frame.channel, (AMQMethodBody) body);
+ handleMethod(frame.channel, ((AMQRequestBody)body).getMethodPayload(),
+ ((AMQRequestBody)body).getRequestId());
+ }
+ else if (body instanceof AMQResponseBody)
+ {
+ handleMethod(frame.channel, ((AMQResponseBody)body).getMethodPayload(),
+ ((AMQRequestBody)body).getRequestId());
}
else
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
index cab020b448..27d5629f27 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
@@ -44,20 +44,20 @@ class ServerHandlerRegistry extends AMQStateManager
private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
-+ AMQProtocolSession protocolSession)
+ AMQProtocolSession protocolSession)
{
super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
}
ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
-+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
this(queueRegistry, exchangeRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
-+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
this(queueRegistry, exchangeRegistry, protocolSession);
init(factory);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
index 8e6d133600..9612efbedf 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
@@ -32,7 +32,7 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.log4j.Logger;
import java.util.Map;
@@ -63,9 +63,9 @@ class ChannelQueueManager
return new QueueBindHandler();
}
- ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler()
+ ClusterMethodHandler<MessageConsumeBody> createBasicConsumeHandler()
{
- return new BasicConsumeHandler();
+ return new MessageConsumeHandler();
}
private void set(int channel, String queue)
@@ -121,13 +121,13 @@ class ChannelQueueManager
}
}
- private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody>
+ private class MessageConsumeHandler extends ClusterMethodHandler<MessageConsumeBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
index 97174d782c..6aaa66dbfa 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
@@ -23,9 +23,9 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -45,7 +45,7 @@ import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.ClusterSynchBody;
-import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.MessageQosBody;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxRollbackBody;
@@ -69,11 +69,11 @@ import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler;
import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler;
import org.apache.qpid.server.handler.ExchangeDeclareHandler;
import org.apache.qpid.server.handler.ExchangeDeleteHandler;
-import org.apache.qpid.server.handler.BasicCancelMethodHandler;
-import org.apache.qpid.server.handler.BasicPublishMethodHandler;
+import org.apache.qpid.server.handler.MessageCancelHandler;
+import org.apache.qpid.server.handler.MessageTransferHandler;
import org.apache.qpid.server.handler.QueueBindHandler;
import org.apache.qpid.server.handler.QueueDeleteHandler;
-import org.apache.qpid.server.handler.BasicQosHandler;
+import org.apache.qpid.server.handler.MessageQosHandler;
import org.apache.qpid.server.handler.TxSelectHandler;
import org.apache.qpid.server.handler.TxCommitHandler;
import org.apache.qpid.server.handler.TxRollbackHandler;
@@ -141,14 +141,14 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
registry.addHandler(QueueBindBody.class, chain(channelQueueMgr.createQueueBindHandler(), replicated(QueueBindHandler.getInstance())));
registry.addHandler(QueueDeleteBody.class, chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new QueueDeleteHandler(false), new QueueDeleteHandler(true)))));
- registry.addHandler(BasicConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr)));
+ registry.addHandler(MessageConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr)));
//other modified handlers:
- registry.addHandler(BasicCancelBody.class, alternate(new RemoteCancelHandler(), BasicCancelMethodHandler.getInstance()));
+ registry.addHandler(MessageCancelBody.class, alternate(new RemoteCancelHandler(), MessageCancelHandler.getInstance()));
//other unaffected handlers:
- registry.addHandler(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
- registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance());
+ registry.addHandler(MessageTransferBody.class, MessageTransferHandler.getInstance());
+ registry.addHandler(MessageQosBody.class, MessageQosHandler.getInstance());
registry.addHandler(ChannelOpenBody.class, ChannelOpenHandler.getInstance());
registry.addHandler(ChannelCloseBody.class, ChannelCloseHandler.getInstance());
registry.addHandler(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
index e9f039ed09..f17a329c44 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -33,14 +33,14 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody>
+public class RemoteCancelHandler implements StateAwareMethodListener<MessageCancelBody>
{
private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
{
//By convention, consumers setup between brokers use the queue name as the consumer tag:
- AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag);
+ AMQQueue queue = queues.getQueue(evt.getMethod().getDestination());
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
index 693c70b780..a3b2b42bc9 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -38,11 +38,11 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
* Handles consume requests from other cluster members.
*
*/
-public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsumeBody>
+public class RemoteConsumeHandler implements StateAwareMethodListener<MessageConsumeBody>
{
private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class);
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
AMQQueue queue = queues.getQueue(evt.getMethod().queue);
if (queue instanceof ClusteredQueue)
@@ -51,10 +51,8 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
- evt.getMethod().queue // consumerTag
- ));
+ session.writeResponse(evt.getChannelId(), evt.getRequestId(),
+ MessageOkBody.createMethodBody((byte)0, (byte)9));
}
else
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
index ea42db3ded..5d25bab23e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
@@ -21,20 +21,20 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.BroadcastPolicy;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
+import org.apache.qpid.server.handler.MessageConsumeHandler;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody>
+public class ReplicatingConsumeHandler extends ReplicatingHandler<MessageConsumeBody>
{
ReplicatingConsumeHandler(GroupManager groupMgr)
{
@@ -46,7 +46,7 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBo
super(groupMgr, base(), policy);
}
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
//only replicate if the queue in question is a shared queue
if (isShared(queues.getQueue(evt.getMethod().queue)))
@@ -67,18 +67,18 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBo
return queue != null && queue.isShared();
}
- static StateAwareMethodListener<BasicConsumeBody> base()
+ static StateAwareMethodListener<MessageConsumeBody> base()
{
- return new PeerHandler<BasicConsumeBody>(peer(), client());
+ return new PeerHandler<MessageConsumeBody>(peer(), client());
}
- static StateAwareMethodListener<BasicConsumeBody> peer()
+ static StateAwareMethodListener<MessageConsumeBody> peer()
{
return new RemoteConsumeHandler();
}
- static StateAwareMethodListener<BasicConsumeBody> client()
+ static StateAwareMethodListener<MessageConsumeBody> client()
{
- return BasicConsumeMethodHandler.getInstance();
+ return MessageConsumeHandler.getInstance();
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
index a601021bf1..b4249d70aa 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.cluster.replay;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.MessageConsumeBody;
import java.util.Map;
import java.util.HashMap;
@@ -36,7 +36,7 @@ class ConsumerCounts
_counts.put(queue, get(queue) + 1);
}
- synchronized void decrement(String queue)
+ synchronized void decrement(String queue)
{
_counts.put(queue, get(queue) - 1);
}
@@ -53,14 +53,14 @@ class ConsumerCounts
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- BasicConsumeBody m = new BasicConsumeBody((byte)0, (byte)9);
+ MessageConsumeBody m = new MessageConsumeBody((byte)0, (byte)9);
m.queue = queue;
- m.consumerTag = queue;
+ m.destination = queue;
replay(m, messages);
}
}
- private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages)
+ private void replay(MessageConsumeBody msg, List<AMQMethodBody> messages)
{
int count = _counts.get(msg.queue);
for(int i = 0; i < count; i++)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
index c927c0cebf..2675fbb1f4 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.replay;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.QueueBindBody;
@@ -55,8 +55,8 @@ public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory
new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)),
new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)),
new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)),
- new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)),
- new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor))
+ new FrameDescriptor(MessageConsumeBody.class, new MessageConsumeBody(major, minor)),
+ new FrameDescriptor(MessageCancelBody.class, new MessageCancelBody(major, minor))
});
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index ac4879561c..102628c6de 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -29,8 +29,8 @@ import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.ClusterSynchBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.util.LogMessage;
@@ -75,8 +75,8 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
_localRecorders.put(QueueDeclareBody.class, new PrivateQueueDeclareRecorder());
_localRecorders.put(QueueDeleteBody.class, new PrivateQueueDeleteRecorder());
_localRecorders.put(QueueBindBody.class, new PrivateQueueBindRecorder());
- _localRecorders.put(BasicConsumeBody.class, new BasicConsumeRecorder());
- _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder());
+ _localRecorders.put(MessageConsumeBody.class, new BasicConsumeRecorder());
+ _localRecorders.put(MessageCancelBody.class, new BasicCancelRecorder());
_localRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder());
_localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
}
@@ -130,9 +130,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
return methods;
}
- private class BasicConsumeRecorder implements MethodRecorder<BasicConsumeBody>
+ private class BasicConsumeRecorder implements MethodRecorder<MessageConsumeBody>
{
- public void record(BasicConsumeBody method)
+ public void record(MessageConsumeBody method)
{
if(_sharedQueues.containsKey(method.queue))
{
@@ -141,13 +141,13 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
}
}
- private class BasicCancelRecorder implements MethodRecorder<BasicCancelBody>
+ private class BasicCancelRecorder implements MethodRecorder<MessageCancelBody>
{
- public void record(BasicCancelBody method)
+ public void record(MessageCancelBody method)
{
- if(_sharedQueues.containsKey(method.consumerTag))
+ if(_sharedQueues.containsKey(method.getDestination()))
{
- _consumers.decrement(method.consumerTag);
+ _consumers.decrement(method.getDestination());
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index c5b4c1c4b5..3b9f95be46 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.server.cluster.*;
import org.apache.qpid.server.cluster.util.LogMessage;
@@ -91,8 +91,8 @@ public class ClusteredQueue extends AMQQueue
//signal other members:
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- BasicCancelBody request = new BasicCancelBody((byte)0, (byte)9);
- request.consumerTag = getName();
+ MessageCancelBody request = new MessageCancelBody((byte)0, (byte)9);
+ request.destination = getName();
_groupMgr.broadcast(new SimpleSendable(request));
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
index 752cf05a82..a989e45708 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
@@ -23,9 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.Content;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.util.LogMessage;
@@ -92,9 +91,11 @@ public class RemoteQueueProxy extends AMQQueue
void relay(AMQMessage msg) throws AMQException
{
- BasicPublishBody publish = msg.getPublishBody();
+ throw new Error("XXX");
+ /*
+ MessageTransferBody publish = msg.getPublishBody();
ContentHeaderBody header = msg.getContentHeaderBody();
- List<ContentBody> bodies = msg.getContentBodies();
+ List<Content> bodies = msg.getContentBodies();
//(i) construct a new publishing block:
publish.immediate = false;//can't as yet handle the immediate flag in a cluster
@@ -105,5 +106,6 @@ public class RemoteQueueProxy extends AMQQueue
//(ii) send this on to the broker for which it is acting as proxy:
_groupMgr.send(_target, new SimpleSendable(parts));
+ */
}
}