summaryrefslogtreecommitdiff
path: root/java/cluster/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /java/cluster/src
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java88
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java23
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/Broker.java244
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java23
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java365
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java70
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java132
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java60
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java55
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java190
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java132
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java77
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java366
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java69
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java104
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java28
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java87
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/JoinState.java23
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java104
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/Main.java117
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/Member.java28
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java23
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java31
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java25
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java26
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java25
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java41
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java269
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java33
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java27
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/Sendable.java25
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java91
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java156
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java53
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java69
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java136
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java46
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java278
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java52
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java22
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java74
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java35
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java57
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java60
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java50
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java55
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java81
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java127
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java53
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java82
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java28
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java28
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java28
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java26
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java28
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java45
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java66
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java29
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java70
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java34
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java310
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java80
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java69
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java50
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java58
-rw-r--r--java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java161
-rw-r--r--java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java92
-rw-r--r--java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java100
-rw-r--r--java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java60
-rw-r--r--java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java57
-rw-r--r--java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java106
-rw-r--r--java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java93
-rw-r--r--java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java53
73 files changed, 6158 insertions, 0 deletions
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java
new file mode 100644
index 0000000000..3e7a2af01f
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+public class BlockingHandler implements ResponseHandler
+{
+ private final Class _expected;
+ private boolean _completed;
+ private AMQMethodBody _response;
+
+
+ public BlockingHandler()
+ {
+ this(AMQMethodBody.class);
+ }
+
+ public BlockingHandler(Class<? extends AMQMethodBody> expected)
+ {
+ _expected = expected;
+ }
+
+ public void responded(AMQMethodBody response)
+ {
+ if (_expected.isInstance(response))
+ {
+ _response = response;
+ completed();
+ }
+ }
+
+ public void removed()
+ {
+ completed();
+ }
+
+ private synchronized void completed()
+ {
+ _completed = true;
+ notifyAll();
+ }
+
+ synchronized void waitForCompletion()
+ {
+ while (!_completed)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException ignore)
+ {
+
+ }
+ }
+ }
+
+ AMQMethodBody getResponse()
+ {
+ return _response;
+ }
+
+ boolean failed()
+ {
+ return _response == null;
+ }
+
+ boolean isCompleted()
+ {
+ return _completed;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java
new file mode 100644
index 0000000000..6d56afe50b
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+public interface BroadcastPolicy
+{
+ public boolean isComplete(int responded, int members);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Broker.java b/java/cluster/src/org/apache/qpid/server/cluster/Broker.java
new file mode 100644
index 0000000000..d8a3fd1b76
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/Broker.java
@@ -0,0 +1,244 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An implementation of the Member interface (through which data is sent to other
+ * peers in the cluster). This class provides a base from which subclasses can
+ * inherit some common behaviour for broadcasting GroupRequests and sending methods
+ * that may expect a response. It also extends the Member abstraction to support
+ * a richer set of operations that are useful within the package but should not be
+ * exposed outside of it.
+ *
+ */
+abstract class Broker extends SimpleMemberHandle implements Member
+{
+ private static final Logger _logger = Logger.getLogger(Broker.class);
+ private static final int DEFAULT_CHANNEL = 1;
+ private static final int START_CHANNEL = 2;
+ private static final int END_CHANNEL = 10000;
+
+
+ private MemberFailureListener _listener;
+ //a wrap-around counter to allocate _requests a unique channel:
+ private int _nextChannel = START_CHANNEL;
+ //outstanding _requests:
+ private final Map<Integer, ResponseHandler> _requests = new HashMap<Integer, ResponseHandler>();
+
+ Broker(String host, int port)
+ {
+ super(host, port);
+ }
+
+ /**
+ * Allows a listener to be registered that will receive callbacks when communication
+ * to the peer this broker instance represents fails.
+ * @param listener the callback to be notified of failures
+ */
+ public void addFailureListener(MemberFailureListener listener)
+ {
+ _listener = listener;
+ }
+
+ /**
+ * Allows subclasses to signal comunication failures
+ */
+ protected void failed()
+ {
+ if (_listener != null)
+ {
+ _listener.failed(this);
+ }
+ }
+
+ /**
+ * Subclasses should call this on receiving message responses from the remote
+ * peer. They are matched to any outstanding request they might be response
+ * to, with the completion and callback of that request being managed if
+ * required.
+ *
+ * @param channel the channel on which the method was received
+ * @param response the response received
+ * @return true if the response matched an outstanding request
+ */
+ protected synchronized boolean handleResponse(int channel, AMQMethodBody response)
+ {
+ ResponseHandler request = _requests.get(channel);
+ if (request == null)
+ {
+ if(!_requests.isEmpty())
+ {
+ _logger.warn(new LogMessage("[next channel={3, integer}]: Response {0} on channel {1, integer} failed to match outstanding requests: {2}", response, channel, _requests, _nextChannel));
+ }
+ return false;
+ }
+ else
+ {
+ request.responded(response);
+ return true;
+ }
+ }
+
+ /**
+ * Called when this broker is excluded from the group. Any requests made on
+ * it are informed this member has left the group.
+ */
+ synchronized void remove()
+ {
+ for (ResponseHandler r : _requests.values())
+ {
+ r.removed();
+ }
+ }
+
+ /**
+ * Engages this broker in the specified group request
+ *
+ * @param request the request being made to a group of brokers
+ * @throws AMQException if there is any failure
+ */
+ synchronized void invoke(GroupRequest request) throws AMQException
+ {
+ int channel = nextChannel();
+ _requests.put(channel, new GroupRequestAdapter(request, channel));
+ request.send(channel, this);
+ }
+
+ /**
+ * Sends a message to the remote peer and undertakes to notify the specified
+ * handler of the response.
+ *
+ * @param msg the message to send
+ * @param handler the callback to notify of responses (or the removal of this broker
+ * from the group)
+ * @throws AMQException
+ */
+ synchronized void send(Sendable msg, ResponseHandler handler) throws AMQException
+ {
+ int channel;
+ if (handler != null)
+ {
+ channel = nextChannel();
+ _requests.put(channel, new RemovingWrapper(handler, channel));
+ }
+ else
+ {
+ channel = DEFAULT_CHANNEL;
+ }
+
+ msg.send(channel, this);
+ }
+
+ private int nextChannel()
+ {
+ int channel = _nextChannel++;
+ if(_nextChannel >= END_CHANNEL)
+ {
+ _nextChannel = START_CHANNEL;
+ }
+ return channel;
+ }
+
+ /**
+ * extablish connection without handling redirect
+ */
+ abstract boolean connect() throws IOException, InterruptedException;
+
+ /**
+ * Start connection process, including replay
+ */
+ abstract void connectAsynch(Iterable<AMQMethodBody> msgs);
+
+ /**
+ * Replay messages to the remote peer this instance represents. These messages
+ * must be sent before any others whose transmission is requested through send() etc.
+ *
+ * @param msgs
+ */
+ abstract void replay(Iterable<AMQMethodBody> msgs);
+
+ /**
+ * establish connection, handling redirect if required...
+ */
+ abstract Broker connectToCluster() throws IOException, InterruptedException;
+
+ private class GroupRequestAdapter implements ResponseHandler
+ {
+ private final GroupRequest request;
+ private final int channel;
+
+ GroupRequestAdapter(GroupRequest request, int channel)
+ {
+ this.request = request;
+ this.channel = channel;
+ }
+
+ public void responded(AMQMethodBody response)
+ {
+ request.responseReceived(Broker.this, response);
+ _requests.remove(channel);
+ }
+
+ public void removed()
+ {
+ request.removed(Broker.this);
+ }
+
+ public String toString()
+ {
+ return "GroupRequestAdapter{" + channel + ", " + request + "}";
+ }
+ }
+
+ private class RemovingWrapper implements ResponseHandler
+ {
+ private final ResponseHandler handler;
+ private final int channel;
+
+ RemovingWrapper(ResponseHandler handler, int channel)
+ {
+ this.handler = handler;
+ this.channel = channel;
+ }
+
+ public void responded(AMQMethodBody response)
+ {
+ handler.responded(response);
+ _requests.remove(channel);
+ }
+
+ public void removed()
+ {
+ handler.removed();
+ }
+
+ public String toString()
+ {
+ return "RemovingWrapper{" + channel + ", " + handler + "}";
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java
new file mode 100644
index 0000000000..75a47a168f
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+interface BrokerFactory
+{
+ public Broker create(MemberHandle handle);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java b/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java
new file mode 100644
index 0000000000..db667879f6
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java
@@ -0,0 +1,365 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.cluster.replay.ReplayManager;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.cluster.util.InvokeMultiple;
+import org.apache.qpid.framing.AMQMethodBody;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Manages the membership list of a group and the set of brokers representing the
+ * remote peers. The group should be initialised through a call to establish()
+ * or connectToLeader().
+ *
+ */
+class BrokerGroup
+{
+ private static final Logger _logger = Logger.getLogger(BrokerGroup.class);
+
+ private final InvokeMultiple<MembershipChangeListener> _changeListeners = new InvokeMultiple<MembershipChangeListener>(MembershipChangeListener.class);
+ private final ReplayManager _replayMgr;
+ private final MemberHandle _local;
+ private final BrokerFactory _factory;
+ private final Object _lock = new Object();
+ private final Set<MemberHandle> _synch = new HashSet<MemberHandle>();
+ private List<MemberHandle> _members;
+ private List<Broker> _peers = new ArrayList<Broker>();
+ private JoinState _state = JoinState.UNINITIALISED;
+
+ /**
+ * Creates an unitialised group.
+ *
+ * @param local a handle that represents the local broker
+ * @param replayMgr the replay manager to use when creating new brokers
+ * @param factory the factory through which broker instances are created
+ */
+ BrokerGroup(MemberHandle local, ReplayManager replayMgr, BrokerFactory factory)
+ {
+ _replayMgr = replayMgr;
+ _local = local;
+ _factory = factory;
+ }
+
+ /**
+ * Called to establish the local broker as the leader of a new group
+ */
+ void establish()
+ {
+ synchronized (_lock)
+ {
+ setState(JoinState.JOINED);
+ _members = new ArrayList<MemberHandle>();
+ _members.add(_local);
+ }
+ fireChange();
+ }
+
+ /**
+ * Called by prospect to connect to group
+ */
+ Broker connectToLeader(MemberHandle handle) throws Exception
+ {
+ Broker leader = _factory.create(handle);
+ leader = leader.connectToCluster();
+ synchronized (_lock)
+ {
+ setState(JoinState.JOINING);
+ _members = new ArrayList<MemberHandle>();
+ _members.add(leader);
+ _peers.add(leader);
+ }
+ fireChange();
+ return leader;
+ }
+
+ /**
+ * Called by leader when handling a join request
+ */
+ Broker connectToProspect(MemberHandle handle) throws IOException, InterruptedException
+ {
+ Broker prospect = _factory.create(handle);
+ prospect.connect();
+ synchronized (_lock)
+ {
+ _members.add(prospect);
+ _peers.add(prospect);
+ }
+ fireChange();
+ return prospect;
+ }
+
+ /**
+ * Called in reponse to membership announcements.
+ *
+ * @param members the list of members now part of the group
+ */
+ void setMembers(List<MemberHandle> members)
+ {
+ if (isJoined())
+ {
+ List<Broker> old = _peers;
+
+ synchronized (_lock)
+ {
+ _peers = getBrokers(members);
+ _members = new ArrayList<MemberHandle>(members);
+ }
+
+ //remove those that are still members
+ old.removeAll(_peers);
+
+ //handle failure of any brokers that haven't survived
+ for (Broker peer : old)
+ {
+ peer.remove();
+ }
+ }
+ else
+ {
+ synchronized (_lock)
+ {
+ setState(JoinState.INITIATION);
+ _members = new ArrayList<MemberHandle>(members);
+ _synch.addAll(_members);
+ _synch.remove(_local);
+ }
+ }
+ fireChange();
+ }
+
+ List<MemberHandle> getMembers()
+ {
+ synchronized (_lock)
+ {
+ return Collections.unmodifiableList(_members);
+ }
+ }
+
+ List<Broker> getPeers()
+ {
+ synchronized (_lock)
+ {
+ return _peers;
+ }
+ }
+
+ /**
+ * Removes the member presented from the group
+ * @param peer the broker that should be removed
+ */
+ void remove(Broker peer)
+ {
+ synchronized (_lock)
+ {
+ _peers.remove(peer);
+ _members.remove(peer);
+ }
+ fireChange();
+ }
+
+ MemberHandle getLocal()
+ {
+ return _local;
+ }
+
+ Broker getLeader()
+ {
+ synchronized (_lock)
+ {
+ return _peers.size() > 0 ? _peers.get(0) : null;
+ }
+ }
+
+ /**
+ * Allows a Broker instance to be retrieved for a given handle
+ *
+ * @param handle the handle for which a broker is sought
+ * @param create flag to indicate whther a broker should be created for the handle if
+ * one is not found within the list of known peers
+ * @return the broker corresponding to handle or null if a match cannot be found and
+ * create is false
+ */
+ Broker findBroker(MemberHandle handle, boolean create)
+ {
+ if (handle instanceof Broker)
+ {
+ return (Broker) handle;
+ }
+ else
+ {
+ for (Broker b : getPeers())
+ {
+ if (b.matches(handle))
+ {
+ return b;
+ }
+ }
+ }
+ if (create)
+ {
+ Broker b = _factory.create(handle);
+ List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local));
+ _logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b));
+ b.connectAsynch(msgs);
+
+ return b;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @param member the member to test for leadership
+ * @return true if the passed in member is the group leader, false otherwise
+ */
+ boolean isLeader(MemberHandle member)
+ {
+ synchronized (_lock)
+ {
+ return member.matches(_members.get(0));
+ }
+ }
+
+ /**
+ * @return true if the local broker is the group leader, false otherwise
+ */
+ boolean isLeader()
+ {
+ return isLeader(_local);
+ }
+
+ /**
+ * Used when the leader fails and the next broker in the list needs to
+ * assume leadership
+ * @return true if the action succeeds
+ */
+ boolean assumeLeadership()
+ {
+ boolean valid;
+ synchronized (_lock)
+ {
+ valid = _members.size() > 1 && _local.matches(_members.get(1));
+ if (valid)
+ {
+ _members.remove(0);
+ _peers.remove(0);
+ }
+ }
+ fireChange();
+ return valid;
+ }
+
+ /**
+ * Called in response to a Cluster.Synch message being received during the join
+ * process. This indicates that the member mentioned has replayed all necessary
+ * messages to the local broker.
+ *
+ * @param member the member from whom the synch messages was received
+ */
+ void synched(MemberHandle member)
+ {
+ _logger.info(new LogMessage("Synchronised with {0}", member));
+ synchronized (_lock)
+ {
+ if (isLeader(member))
+ {
+ setState(JoinState.INDUCTION);
+ }
+ _synch.remove(member);
+ if (_synch.isEmpty())
+ {
+ _peers = getBrokers(_members);
+ setState(JoinState.JOINED);
+ }
+ }
+ }
+
+
+ /**
+ * @return the state of the group
+ */
+ JoinState getState()
+ {
+ synchronized (_lock)
+ {
+ return _state;
+ }
+ }
+
+ void addMemberhipChangeListener(MembershipChangeListener l)
+ {
+ _changeListeners.addListener(l);
+ }
+
+ void removeMemberhipChangeListener(MembershipChangeListener l)
+ {
+ _changeListeners.removeListener(l);
+ }
+
+
+
+ private void setState(JoinState state)
+ {
+ _logger.info(new LogMessage("Changed state from {0} to {1}", _state, state));
+ _state = state;
+ }
+
+ private boolean isJoined()
+ {
+ return inState(JoinState.JOINED);
+ }
+
+ private boolean inState(JoinState state)
+ {
+ return _state.equals(state);
+ }
+
+ private List<Broker> getBrokers(List<MemberHandle> handles)
+ {
+ List<Broker> brokers = new ArrayList<Broker>();
+ for (MemberHandle handle : handles)
+ {
+ if (!_local.matches(handle))
+ {
+ brokers.add(findBroker(handle, true));
+ }
+ }
+ return brokers;
+ }
+
+ private void fireChange()
+ {
+ List<MemberHandle> members;
+ synchronized(this)
+ {
+ members = new ArrayList(_members);
+ }
+ _changeListeners.getProxy().changed(Collections.unmodifiableList(members));
+ }
+} \ No newline at end of file
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java
new file mode 100644
index 0000000000..6909b199ce
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * Hack to assist with reuse of the client handlers for connection setup in
+ * the inter-broker communication within the cluster.
+ *
+ */
+class ClientAdapter implements MethodHandler
+{
+ private final AMQProtocolSession _session;
+ private final AMQStateManager _stateMgr;
+
+ ClientAdapter(IoSession session, AMQStateManager stateMgr)
+ {
+ this(session, stateMgr, "guest", "guest", session.toString(), "/cluster");
+ }
+
+ ClientAdapter(IoSession session, AMQStateManager stateMgr, String user, String password, String name, String path)
+ {
+ _session = new SessionAdapter(session, new ConnectionAdapter(user, password, name, path));
+ _stateMgr = stateMgr;
+ }
+
+ public void handle(int channel, AMQMethodBody method) throws AMQException
+ {
+ AMQMethodEvent evt = new AMQMethodEvent(channel, method, _session);
+ _stateMgr.methodReceived(evt);
+ }
+
+ private class SessionAdapter extends AMQProtocolSession
+ {
+ public SessionAdapter(IoSession session, AMQConnection connection)
+ {
+ super(null, session, connection);
+ }
+ }
+
+ private static class ConnectionAdapter extends AMQConnection
+ {
+ ConnectionAdapter(String username, String password, String clientName, String virtualPath)
+ {
+ super(username, password, clientName, virtualPath);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
new file mode 100644
index 0000000000..d378647698
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
+import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
+import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
+import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
+import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.IllegalStateTransitionException;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An extension of client.AMQStateManager that allows different handlers to be registered.
+ *
+ */
+public class ClientHandlerRegistry extends AMQStateManager
+{
+ private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
+ private final MemberHandle _identity;
+
+ protected ClientHandlerRegistry(MemberHandle local)
+ {
+ super(AMQState.CONNECTION_NOT_STARTED, false);
+
+ _identity = local;
+
+ addHandler(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance(),
+ AMQState.CONNECTION_NOT_STARTED);
+
+ addHandler(ConnectionTuneBody.class, new ConnectionTuneHandler(),
+ AMQState.CONNECTION_NOT_TUNED);
+ addHandler(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance(),
+ AMQState.CONNECTION_NOT_TUNED);
+ addHandler(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance(),
+ AMQState.CONNECTION_NOT_OPENED);
+
+ addHandlers(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance(),
+ AMQState.CONNECTION_NOT_STARTED,
+ AMQState.CONNECTION_NOT_TUNED,
+ AMQState.CONNECTION_NOT_OPENED);
+
+ }
+
+ private ClientRegistry state(AMQState state)
+ {
+ ClientRegistry registry = _handlers.get(state);
+ if (registry == null)
+ {
+ registry = new ClientRegistry();
+ _handlers.put(state, registry);
+ }
+ return registry;
+ }
+
+ protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) throws IllegalStateTransitionException
+ {
+ ClientRegistry registry = _handlers.get(state);
+ return registry == null ? null : registry.getHandler(frame);
+ }
+
+
+ <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states)
+ {
+ for (AMQState state : states)
+ {
+ addHandler(type, handler, state);
+ }
+ }
+
+ <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state)
+ {
+ ClientRegistry registry = _handlers.get(state);
+ if (registry == null)
+ {
+ registry = new ClientRegistry();
+ _handlers.put(state, registry);
+ }
+ registry.add(type, handler);
+ }
+
+ static class ClientRegistry
+ {
+ private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry
+ = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>();
+
+ <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler)
+ {
+ registry.put(type, handler);
+ }
+
+ StateAwareMethodListener getHandler(AMQMethodBody frame)
+ {
+ return registry.get(frame.getClass());
+ }
+ }
+
+ class ConnectionTuneHandler extends ConnectionTuneMethodHandler
+ {
+ protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
+ {
+ return super.createConnectionOpenFrame(channel, path, ClusterCapability.add(capabilities, _identity), insist);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java
new file mode 100644
index 0000000000..eb2992a690
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.server.cluster.handler.ClusterMethodHandlerFactory;
+import org.apache.qpid.server.cluster.replay.RecordingMethodHandlerFactory;
+import org.apache.qpid.server.cluster.replay.ReplayStore;
+
+import java.net.InetSocketAddress;
+
+class ClusterBuilder
+{
+ private final LoadTable loadTable = new LoadTable();
+ private final ReplayStore replayStore = new ReplayStore();
+ private final MemberHandle handle;
+ private final GroupManager groupMgr;
+
+ ClusterBuilder(InetSocketAddress address)
+ {
+ handle = new SimpleMemberHandle(address.getHostName(), address.getPort()).resolve();
+ groupMgr = new DefaultGroupManager(handle, getBrokerFactory(), replayStore, loadTable);
+ }
+
+ GroupManager getGroupManager()
+ {
+ return groupMgr;
+ }
+
+ ServerHandlerRegistry getHandlerRegistry()
+ {
+ return new ServerHandlerRegistry(getHandlerFactory());
+ }
+
+ private MethodHandlerFactory getHandlerFactory()
+ {
+ MethodHandlerFactory factory = new ClusterMethodHandlerFactory(groupMgr, loadTable);
+ //need to wrap relevant handlers with recording handler for easy replay:
+ return new RecordingMethodHandlerFactory(factory, replayStore);
+ }
+
+ private BrokerFactory getBrokerFactory()
+ {
+ return new MinaBrokerProxyFactory(handle);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java b/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java
new file mode 100644
index 0000000000..9bf2e02d3c
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ClusterCapability
+{
+ public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*";
+ public static final String PEER = "cluster_peer";
+
+ public static String add(String original, MemberHandle identity)
+ {
+ return original == null ? peer(identity) : original + " " + peer(identity);
+ }
+
+ private static String peer(MemberHandle identity)
+ {
+ return PEER + "=" + identity.getDetails();
+ }
+
+ public static boolean contains(String in)
+ {
+ return in != null && in.contains(in);
+ }
+
+ public static MemberHandle getPeer(String in)
+ {
+ Matcher matcher = Pattern.compile(PATTERN).matcher(in);
+ if (matcher.matches())
+ {
+ return new SimpleMemberHandle(matcher.group(1));
+ }
+ else
+ {
+ throw new RuntimeException("Could not find peer in '" + in + "'");
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
new file mode 100644
index 0000000000..65b188484c
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.ClusterMembershipBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.cluster.util.LogMessage;
+
+import java.net.InetSocketAddress;
+
+public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements InductionBuffer.MessageHandler
+{
+ private static final Logger _logger = Logger.getLogger(ClusteredProtocolHandler.class);
+ private final InductionBuffer _peerBuffer = new InductionBuffer(this);
+ private final InductionBuffer _clientBuffer = new InductionBuffer(this);
+ private final GroupManager _groupMgr;
+ private final ServerHandlerRegistry _handlers;
+
+ public ClusteredProtocolHandler(InetSocketAddress address)
+ {
+ this(ApplicationRegistry.getInstance(), address);
+ }
+
+ public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address)
+ {
+ this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address);
+ }
+
+ public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address)
+ {
+ super(queueRegistry, exchangeRegistry);
+ ClusterBuilder builder = new ClusterBuilder(address);
+ _groupMgr = builder.getGroupManager();
+ _handlers = builder.getHandlerRegistry();
+ }
+
+ public ClusteredProtocolHandler(ClusteredProtocolHandler handler)
+ {
+ super(handler);
+ _groupMgr = handler._groupMgr;
+ _handlers = handler._handlers;
+ }
+
+ protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ {
+ new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
+ }
+
+ void connect(String join) throws Exception
+ {
+ if (join == null)
+ {
+ _groupMgr.establish();
+ }
+ else
+ {
+ _groupMgr.join(new SimpleMemberHandle(join));
+ }
+ }
+
+ private boolean inState(JoinState state)
+ {
+ return _groupMgr.getState().equals(state);
+ }
+
+ public void messageReceived(IoSession session, Object msg) throws Exception
+ {
+ JoinState state = _groupMgr.getState();
+ switch (state)
+ {
+ case JOINED:
+ _logger.debug(new LogMessage("Received {0}", msg));
+ super.messageReceived(session, msg);
+ break;
+ case JOINING:
+ case INITIATION:
+ case INDUCTION:
+ buffer(session, msg);
+ break;
+ default:
+ throw new AMQException("Received message while in state: " + state);
+ }
+ JoinState latest = _groupMgr.getState();
+ if (!latest.equals(state))
+ {
+ switch (latest)
+ {
+ case INDUCTION:
+ _logger.info("Reached induction, delivering buffered message from peers");
+ _peerBuffer.deliver();
+ break;
+ case JOINED:
+ _logger.info("Reached joined, delivering buffered message from clients");
+ _clientBuffer.deliver();
+ break;
+ }
+ }
+ }
+
+ private void buffer(IoSession session, Object msg) throws Exception
+ {
+ if (isBufferable(msg))
+ {
+ MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
+ if (peer == null)
+ {
+ _logger.debug(new LogMessage("Buffering {0} for client", msg));
+ _clientBuffer.receive(session, msg);
+ }
+ else if (inState(JoinState.JOINING) && isMembershipAnnouncement(msg))
+ {
+ _logger.debug(new LogMessage("Initial membership [{0}] received from {1}", msg, peer));
+ super.messageReceived(session, msg);
+ }
+ else if (inState(JoinState.INITIATION) && _groupMgr.isLeader(peer))
+ {
+ _logger.debug(new LogMessage("Replaying {0} from leader ", msg));
+ super.messageReceived(session, msg);
+ }
+ else if (inState(JoinState.INDUCTION))
+ {
+ _logger.debug(new LogMessage("Replaying {0} from peer {1}", msg, peer));
+ super.messageReceived(session, msg);
+ }
+ else
+ {
+ _logger.debug(new LogMessage("Buffering {0} for peer {1}", msg, peer));
+ _peerBuffer.receive(session, msg);
+ }
+ }
+ else
+ {
+ _logger.debug(new LogMessage("Received {0}", msg));
+ super.messageReceived(session, msg);
+ }
+ }
+
+ public void deliver(IoSession session, Object msg) throws Exception
+ {
+ _logger.debug(new LogMessage("Delivering {0}", msg));
+ super.messageReceived(session, msg);
+ }
+
+ private boolean isMembershipAnnouncement(Object msg)
+ {
+ return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody);
+ }
+
+ private boolean isBufferable(Object msg)
+ {
+ return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame);
+ }
+
+ private boolean isBuffereable(AMQBody body)
+ {
+ return !(body instanceof ConnectionStartOkBody ||
+ body instanceof ConnectionTuneOkBody ||
+ body instanceof ConnectionSecureOkBody ||
+ body instanceof ConnectionOpenBody);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
new file mode 100644
index 0000000000..e5efe941b3
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+
+public class ClusteredProtocolSession extends AMQMinaProtocolSession
+{
+ private MemberHandle _peer;
+
+ public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager)
+ throws AMQException
+ {
+ super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
+ }
+
+ public boolean isPeerSession()
+ {
+ return _peer != null;
+ }
+
+ public void setSessionPeer(MemberHandle peer)
+ {
+ _peer = peer;
+ }
+
+ public MemberHandle getSessionPeer()
+ {
+ return _peer;
+ }
+
+ public AMQChannel getChannel(int channelId)
+ throws AMQException
+ {
+ AMQChannel channel = super.getChannel(channelId);
+ if (isPeerSession() && channel == null)
+ {
+ channel = new OneUseChannel(channelId);
+ addChannel(channel);
+ }
+ return channel;
+ }
+
+ public static boolean isPeerSession(IoSession session)
+ {
+ return isPeerSession(getAMQProtocolSession(session));
+ }
+
+ public static boolean isPeerSession(AMQProtocolSession session)
+ {
+ return session instanceof ClusteredProtocolSession && ((ClusteredProtocolSession) session).isPeerSession();
+ }
+
+ public static void setSessionPeer(AMQProtocolSession session, MemberHandle peer)
+ {
+ ((ClusteredProtocolSession) session).setSessionPeer(peer);
+ }
+
+ public static MemberHandle getSessionPeer(AMQProtocolSession session)
+ {
+ return ((ClusteredProtocolSession) session).getSessionPeer();
+ }
+
+ public static MemberHandle getSessionPeer(IoSession session)
+ {
+ return getSessionPeer(getAMQProtocolSession(session));
+ }
+
+ /**
+ * Cleans itself up after delivery of a message (publish frame, header and optional body frame(s))
+ */
+ private class OneUseChannel extends AMQChannel
+ {
+ public OneUseChannel(int channelId)
+ throws AMQException
+ {
+ this(channelId, ApplicationRegistry.getInstance());
+ }
+
+ public OneUseChannel(int channelId, IApplicationRegistry registry)
+ throws AMQException
+ {
+ super(channelId,
+ registry.getMessageStore(),
+ registry.getExchangeRegistry());
+ }
+
+ protected void routeCurrentMessage() throws AMQException
+ {
+ super.routeCurrentMessage();
+ removeChannel(getChannelId());
+ }
+ }
+
+ public static boolean isPayloadFromPeer(AMQMessage payload)
+ {
+ return isPeerSession(payload.getPublisher());
+ }
+
+ public static boolean canRelay(AMQMessage payload, MemberHandle target)
+ {
+ //can only relay client messages that have not already been relayed to the given target
+ return !isPayloadFromPeer(payload) && !payload.checkToken(target);
+ }
+
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java b/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
new file mode 100644
index 0000000000..650240fa70
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import java.io.IOException;
+
+class ConnectionStatusMonitor
+{
+ private boolean _complete;
+ private boolean _redirected;
+ private String _host;
+ private int _port;
+ private RuntimeException _error;
+
+ synchronized void opened()
+ {
+ _complete = true;
+ notifyAll();
+ }
+
+ synchronized void redirect(String host, int port)
+ {
+ _complete = true;
+ _redirected = true;
+ this._host = host;
+ this._port = port;
+ }
+
+ synchronized void failed(RuntimeException e)
+ {
+ _error = e;
+ _complete = true;
+ }
+
+ synchronized boolean waitUntilOpen() throws InterruptedException
+ {
+ while (!_complete)
+ {
+ wait();
+ }
+ if (_error != null)
+ {
+ throw _error;
+ }
+ return !_redirected;
+ }
+
+ synchronized boolean isOpened()
+ {
+ return _complete;
+ }
+
+ String getHost()
+ {
+ return _host;
+ }
+
+ int getPort()
+ {
+ return _port;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java
new file mode 100644
index 0000000000..980b36cf21
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -0,0 +1,366 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ClusterJoinBody;
+import org.apache.qpid.framing.ClusterLeaveBody;
+import org.apache.qpid.framing.ClusterMembershipBody;
+import org.apache.qpid.framing.ClusterPingBody;
+import org.apache.qpid.framing.ClusterSuspectBody;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.policy.StandardPolicies;
+import org.apache.qpid.server.cluster.replay.ReplayManager;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.cluster.util.InvokeMultiple;
+
+import java.util.List;
+
+public class DefaultGroupManager implements GroupManager, MemberFailureListener, BrokerFactory, StandardPolicies
+{
+ private static final Logger _logger = Logger.getLogger(DefaultGroupManager.class);
+ private final LoadTable _loadTable;
+ private final BrokerFactory _factory;
+ private final ReplayManager _replayMgr;
+ private final BrokerGroup _group;
+
+ DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr)
+ {
+ this(handle, factory, replayMgr, new LoadTable());
+ }
+
+ DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr, LoadTable loadTable)
+ {
+ handle = SimpleMemberHandle.resolve(handle);
+ _logger.info(handle);
+ _loadTable = loadTable;
+ _factory = factory;
+ _replayMgr = replayMgr;
+ _group = new BrokerGroup(handle, _replayMgr, this);
+ }
+
+ public JoinState getState()
+ {
+ return _group.getState();
+ }
+
+ public void addMemberhipChangeListener(MembershipChangeListener l)
+ {
+ _group.addMemberhipChangeListener(l);
+ }
+
+ public void removeMemberhipChangeListener(MembershipChangeListener l)
+ {
+ _group.removeMemberhipChangeListener(l);
+ }
+
+ public void broadcast(Sendable message) throws AMQException
+ {
+ for (Broker b : _group.getPeers())
+ {
+ b.send(message, null);
+ }
+ }
+
+ public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException
+ {
+ GroupRequest request = new GroupRequest(message, policy, callback);
+ for (Broker b : _group.getPeers())
+ {
+ b.invoke(request);
+ }
+ request.finishedSend();
+ }
+
+ public void send(MemberHandle broker, Sendable message) throws AMQException
+ {
+ Broker destination = findBroker(broker);
+ if(destination == null)
+ {
+ _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));
+ }
+ else
+ {
+ destination.send(message, null);
+ _logger.debug(new LogMessage("Sent {0} to {1}", message, broker));
+ }
+ }
+
+ private void send(Broker broker, Sendable message, ResponseHandler handler) throws AMQException
+ {
+ broker.send(message, handler);
+ }
+
+ private void ping(Broker b) throws AMQException
+ {
+ ClusterPingBody ping = new ClusterPingBody();
+ ping.broker = _group.getLocal().getDetails();
+ ping.responseRequired = true;
+ ping.load = _loadTable.getLocalLoad();
+ BlockingHandler handler = new BlockingHandler();
+ send(getLeader(), new SimpleSendable(ping), handler);
+ handler.waitForCompletion();
+ if (handler.failed())
+ {
+ if (isLeader())
+ {
+ handleFailure(b);
+ }
+ else
+ {
+ suspect(b);
+ }
+ }
+ else
+ {
+ _loadTable.setLoad(b, ((ClusterPingBody) handler.getResponse()).load);
+ }
+ }
+
+ public void handlePing(MemberHandle member, long load)
+ {
+ _loadTable.setLoad(findBroker(member), load);
+ }
+
+ public Member redirect()
+ {
+ return _loadTable.redirect();
+ }
+
+ public void establish()
+ {
+ _group.establish();
+ _logger.info("Established cluster");
+ }
+
+ public void join(MemberHandle member) throws AMQException
+ {
+ member = SimpleMemberHandle.resolve(member);
+
+ Broker leader = connectToLeader(member);
+ _logger.info(new LogMessage("Connected to {0}. joining", leader));
+ ClusterJoinBody join = new ClusterJoinBody();
+ join.broker = _group.getLocal().getDetails();
+ send(leader, new SimpleSendable(join));
+ }
+
+ private Broker connectToLeader(MemberHandle member) throws AMQException
+ {
+ try
+ {
+ return _group.connectToLeader(member);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Could not connect to leader: " + e, e);
+ }
+ }
+
+ public void leave() throws AMQException
+ {
+ ClusterLeaveBody leave = new ClusterLeaveBody();
+ leave.broker = _group.getLocal().getDetails();
+ send(getLeader(), new SimpleSendable(leave));
+ }
+
+ private void suspect(MemberHandle broker) throws AMQException
+ {
+ if (_group.isLeader(broker))
+ {
+ //need new leader, if this broker is next in line it can assume leadership
+ if (_group.assumeLeadership())
+ {
+ announceMembership();
+ }
+ else
+ {
+ _logger.warn(new LogMessage("Leader failed. Expecting {0} to succeed.", _group.getMembers().get(1)));
+ }
+ }
+ else
+ {
+ ClusterSuspectBody suspect = new ClusterSuspectBody();
+ suspect.broker = broker.getDetails();
+ send(getLeader(), new SimpleSendable(suspect));
+ }
+ }
+
+
+ public void handleJoin(MemberHandle member) throws AMQException
+ {
+ _logger.info(new LogMessage("Handling join request for {0}", member));
+ if(isLeader())
+ {
+ //connect to the host and port specified:
+ Broker prospect = connectToProspect(member);
+ announceMembership();
+ List<AMQMethodBody> msgs = _replayMgr.replay(true);
+ _logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect));
+ prospect.replay(msgs);
+ }
+ else
+ {
+ //pass request on to leader:
+ ClusterJoinBody request = new ClusterJoinBody();
+ request.broker = member.getDetails();
+ Broker leader = getLeader();
+ send(leader, new SimpleSendable(request));
+ _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader));
+ }
+ }
+
+ private Broker connectToProspect(MemberHandle member) throws AMQException
+ {
+ try
+ {
+ return _group.connectToProspect(member);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw new AMQException("Could not connect to prospect: " + e, e);
+ }
+ }
+
+ public void handleLeave(MemberHandle member) throws AMQException
+ {
+ handleFailure(findBroker(member));
+ announceMembership();
+ }
+
+ public void handleSuspect(MemberHandle member) throws AMQException
+ {
+ Broker b = findBroker(member);
+ if(b != null)
+ {
+ //ping it to check it has failed, ping will handle failure if it has
+ ping(b);
+ announceMembership();
+ }
+ }
+
+ public void handleSynch(MemberHandle member)
+ {
+ _group.synched(member);
+ }
+
+ private ClusterMembershipBody createAnnouncement(String membership)
+ {
+ ClusterMembershipBody announce = new ClusterMembershipBody();
+ //TODO: revise this way of converting String to bytes...
+ announce.members = membership.getBytes();
+ return announce;
+ }
+
+ private void announceMembership() throws AMQException
+ {
+ String membership = SimpleMemberHandle.membersToString(_group.getMembers());
+ ClusterMembershipBody announce = createAnnouncement(membership);
+ broadcast(new SimpleSendable(announce));
+ _logger.info(new LogMessage("Membership announcement sent: {0}", membership));
+ }
+
+ private void handleFailure(Broker peer)
+ {
+ peer.remove();
+ _group.remove(peer);
+ }
+
+ public void handleMembershipAnnouncement(String membership) throws AMQException
+ {
+ _group.setMembers(SimpleMemberHandle.stringToMembers(membership));
+ _logger.info(new LogMessage("Membership announcement received: {0}", membership));
+ }
+
+ public boolean isLeader()
+ {
+ return _group.isLeader();
+ }
+
+ public boolean isLeader(MemberHandle handle)
+ {
+ return _group.isLeader(handle);
+ }
+
+ public Broker getLeader()
+ {
+ return _group.getLeader();
+ }
+
+ private Broker findBroker(MemberHandle handle)
+ {
+ return _group.findBroker(handle, false);
+ }
+
+ public Member getMember(MemberHandle handle)
+ {
+ return findBroker(handle);
+ }
+
+ public boolean isMember(MemberHandle member)
+ {
+ for (MemberHandle handle : _group.getMembers())
+ {
+ if (handle.matches(member))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public MemberHandle getLocal()
+ {
+ return _group.getLocal();
+ }
+
+ public void failed(MemberHandle member)
+ {
+ if (isLeader())
+ {
+ handleFailure(findBroker(member));
+ try
+ {
+ announceMembership();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error announcing failure: " + e, e);
+ }
+ }
+ else
+ {
+ try
+ {
+ suspect(member);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error sending suspect: " + e, e);
+ }
+ }
+ }
+
+ public Broker create(MemberHandle handle)
+ {
+ Broker broker = _factory.create(handle);
+ broker.addFailureListener(this);
+ return broker;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java b/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java
new file mode 100644
index 0000000000..d3b33f6fe3
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public interface GroupManager
+{
+ /**
+ * Establish a new cluster with the local member as the leader.
+ */
+ public void establish();
+
+ /**
+ * Join the cluster to which member belongs
+ */
+ public void join(MemberHandle member) throws AMQException;
+
+ public void broadcast(Sendable message) throws AMQException;
+
+ public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException;
+
+ public void send(MemberHandle broker, Sendable message) throws AMQException;
+
+ public void leave() throws AMQException;
+
+ public void handleJoin(MemberHandle member) throws AMQException;
+
+ public void handleLeave(MemberHandle member) throws AMQException;
+
+ public void handleSuspect(MemberHandle member) throws AMQException;
+
+ public void handlePing(MemberHandle member, long load);
+
+ public void handleMembershipAnnouncement(String membership) throws AMQException;
+
+ public void handleSynch(MemberHandle member);
+
+ public boolean isLeader();
+
+ public boolean isLeader(MemberHandle handle);
+
+ public boolean isMember(MemberHandle member);
+
+ public MemberHandle redirect();
+
+ public MemberHandle getLocal();
+
+ public JoinState getState();
+
+ public void addMemberhipChangeListener(MembershipChangeListener l);
+
+ public void removeMemberhipChangeListener(MembershipChangeListener l);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java b/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java
new file mode 100644
index 0000000000..6c45c6e655
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents a method sent to a group of Member instances. Manages the responses,
+ * completion and callback.
+ *
+ */
+class GroupRequest
+{
+ private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>();
+ private final List<Member> _brokers = new ArrayList<Member>();
+ private boolean _sent;
+
+ private final Sendable _request;
+ private final BroadcastPolicy _policy;
+ private final GroupResponseHandler _callback;
+
+ GroupRequest(Sendable request, BroadcastPolicy policy, GroupResponseHandler callback)
+ {
+ _request = request;
+ _policy = policy;
+ _callback = callback;
+ }
+
+ void send(int channel, Member session) throws AMQException
+ {
+ _brokers.add(session);
+ _request.send(channel, session);
+ }
+
+ boolean finishedSend()
+ {
+ _sent = true;
+ return checkCompletion();
+ }
+
+ public boolean responseReceived(Member broker, AMQMethodBody response)
+ {
+ _responses.put(broker, response);
+ return checkCompletion();
+ }
+
+ public boolean removed(Member broker)
+ {
+ _brokers.remove(broker);
+ return checkCompletion();
+ }
+
+ private synchronized boolean checkCompletion()
+ {
+ return isComplete() && callback();
+ }
+
+ boolean isComplete()
+ {
+ return _sent && _policy != null && _policy.isComplete(_responses.size(), _brokers.size());
+ }
+
+ boolean callback()
+ {
+ _callback.response(getResults(), _brokers);
+ return true;
+ }
+
+ List<AMQMethodBody> getResults()
+ {
+ List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size());
+ for (Member b : _brokers)
+ {
+ results.add(_responses.get(b));
+ }
+ return results;
+ }
+
+ public String toString()
+ {
+ return "GroupRequest{request=" + _request +", brokers=" + _brokers + ", responses=" + _responses + "}";
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java
new file mode 100644
index 0000000000..c12d06a337
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+import java.util.List;
+
+public interface GroupResponseHandler
+{
+ //Note: this implies that the response to a group request will always be a method body...
+ public void response(List<AMQMethodBody> responses, List<Member> members);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java b/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java
new file mode 100644
index 0000000000..76e15b88ec
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.IoSession;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Buffers any received messages until join completes.
+ *
+ */
+class InductionBuffer
+{
+ private final Queue<Message> _buffer = new LinkedList<Message>();
+ private final MessageHandler _handler;
+ private boolean _buffering = true;
+
+ InductionBuffer(MessageHandler handler)
+ {
+ _handler = handler;
+ }
+
+ private void process() throws Exception
+ {
+ for (Message o = _buffer.poll(); o != null; o = _buffer.poll())
+ {
+ o.deliver(_handler);
+ }
+ _buffering = false;
+ }
+
+ synchronized void deliver() throws Exception
+ {
+ process();
+ }
+
+ synchronized void receive(IoSession session, Object msg) throws Exception
+ {
+ if (_buffering)
+ {
+ _buffer.offer(new Message(session, msg));
+ }
+ else
+ {
+ _handler.deliver(session, msg);
+ }
+ }
+
+ private static class Message
+ {
+ private final IoSession _session;
+ private final Object _msg;
+
+ Message(IoSession session, Object msg)
+ {
+ _session = session;
+ _msg = msg;
+ }
+
+ void deliver(MessageHandler handler) throws Exception
+ {
+ handler.deliver(_session, _msg);
+ }
+ }
+
+ static interface MessageHandler
+ {
+ public void deliver(IoSession session, Object msg) throws Exception;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java b/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java
new file mode 100644
index 0000000000..af618631e4
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+public enum JoinState
+{
+ UNINITIALISED, JOINING, INITIATION, INDUCTION, JOINED
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java b/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java
new file mode 100644
index 0000000000..d95229505f
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+/**
+ * Maintains loading information about the local member and its cluster peers.
+ *
+ */
+public class LoadTable
+{
+ private final Map<MemberHandle, Loading> _peers = new HashMap<MemberHandle, Loading>();
+ private final PriorityQueue<Loading> _loads = new PriorityQueue<Loading>();
+ private final Loading _local = new Loading(null);
+
+ public LoadTable()
+ {
+ _loads.add(_local);
+ }
+
+ public void setLoad(Member member, long load)
+ {
+ synchronized (_peers)
+ {
+ Loading loading = _peers.get(member);
+ if (loading == null)
+ {
+ loading = new Loading(member);
+ synchronized (_loads)
+ {
+ _loads.add(loading);
+ }
+ _peers.put(member, loading);
+ }
+ loading.load = load;
+ }
+ }
+
+ public void incrementLocalLoad()
+ {
+ synchronized (_local)
+ {
+ _local.load++;
+ }
+ }
+
+ public void decrementLocalLoad()
+ {
+ synchronized (_local)
+ {
+ _local.load--;
+ }
+ }
+
+ public long getLocalLoad()
+ {
+ synchronized (_local)
+ {
+ return _local.load;
+ }
+ }
+
+ public Member redirect()
+ {
+ synchronized (_loads)
+ {
+ return _loads.peek().member;
+ }
+ }
+
+ private static class Loading implements Comparable
+ {
+ private final Member member;
+ private long load;
+
+ Loading(Member member)
+ {
+ this.member = member;
+ }
+
+ public int compareTo(Object o)
+ {
+ return (int) (load - ((Loading) o).load);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Main.java b/java/cluster/src/org/apache/qpid/server/cluster/Main.java
new file mode 100644
index 0000000000..94ec2042a2
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/Main.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.transport.ConnectorConfiguration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * TODO: This is a cut-and-paste from the original broker Main class. Would be preferrable
+ * to make that class more reuseable to avoid all this duplication.
+ *
+ */
+public class Main extends org.apache.qpid.server.Main
+{
+ private static final Logger _logger = Logger.getLogger(Main.class);
+
+ protected Main(String[] args)
+ {
+ super(args);
+ }
+
+ protected void setOptions(Options otions)
+ {
+ super.setOptions(options);
+
+ //extensions:
+ Option join = OptionBuilder.withArgName("join").hasArg().withDescription("Join the specified cluster member. Overrides any value in the config file").
+ withLongOpt("join").create("j");
+ options.addOption(join);
+ }
+
+ protected void bind(int port, ConnectorConfiguration connectorConfig)
+ {
+ try
+ {
+ IoAcceptor acceptor = new SocketAcceptor();
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+
+ sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
+ sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
+ sc.setTcpNoDelay(true);
+
+ // if we do not use the executor pool threading model we get the default leader follower
+ // implementation provided by MINA
+ if (connectorConfig.enableExecutorPool)
+ {
+ sconfig.setThreadModel(new ReadWriteThreadModel());
+ }
+
+ String host = InetAddress.getLocalHost().getHostName();
+ ClusteredProtocolHandler handler = new ClusteredProtocolHandler(new InetSocketAddress(host, port));
+ if (connectorConfig.enableNonSSL)
+ {
+ acceptor.bind(new InetSocketAddress(port), handler, sconfig);
+ _logger.info("Qpid.AMQP listening on non-SSL port " + port);
+ handler.connect(commandLine.getOptionValue("j"));
+ }
+
+ if (connectorConfig.enableSSL)
+ {
+ ClusteredProtocolHandler sslHandler = new ClusteredProtocolHandler(handler);
+ sslHandler.setUseSSL(true);
+ acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+ _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.error("Unable to bind service to registry: " + e, e);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to connect to cluster: " + e, e);
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ new Main(args);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Member.java b/java/cluster/src/org/apache/qpid/server/cluster/Member.java
new file mode 100644
index 0000000000..d7825aac6e
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/Member.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+
+public interface Member extends MemberHandle
+{
+ public void send(AMQDataBlock data) throws AMQException;
+
+ public void addFailureListener(MemberFailureListener listener);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java b/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java
new file mode 100644
index 0000000000..4b56eaf962
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+interface MemberFailureListener
+{
+ public void failed(MemberHandle member);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java b/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java
new file mode 100644
index 0000000000..75f4d19103
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+public interface MemberHandle
+{
+ public String getHost();
+
+ public int getPort();
+
+ public boolean matches(MemberHandle m);
+
+ public boolean matches(String host, int port);
+
+ public String getDetails();
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java b/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java
new file mode 100644
index 0000000000..a62a413bc6
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import java.util.List;
+
+public interface MembershipChangeListener
+{
+ public void changed(List<MemberHandle> members);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java
new file mode 100644
index 0000000000..cd3e00012e
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+
+interface MethodHandler
+{
+ public void handle(int channel, AMQMethodBody method) throws AMQException;
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java
new file mode 100644
index 0000000000..f4a721dbc5
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.server.state.AMQState;
+
+public interface MethodHandlerFactory
+{
+ public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
new file mode 100644
index 0000000000..4870f9daaa
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MethodHandlerRegistry
+{
+ private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry =
+ new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
+
+ public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler)
+ {
+ registry.put(type, handler);
+ return this;
+ }
+
+ public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame)
+ {
+ return (StateAwareMethodListener<B>) registry.get(frame.getClass());
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java
new file mode 100644
index 0000000000..f193862e78
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -0,0 +1,269 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionRedirectBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersionList;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * A 'client stub' for a remote cluster peer, using MINA for IO Layer
+ *
+ */
+public class MinaBrokerProxy extends Broker implements MethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(MinaBrokerProxy.class);
+ private final ConnectionStatusMonitor _connectionMonitor = new ConnectionStatusMonitor();
+ private final ClientHandlerRegistry _legacyHandler;
+ private final MinaBinding _binding = new MinaBinding();
+ private final MemberHandle _local;
+ private IoSession _session;
+ private MethodHandler _handler;
+ private Iterable<AMQMethodBody> _replay;
+
+ MinaBrokerProxy(String host, int port, MemberHandle local)
+ {
+ super(host, port);
+ _local = local;
+ _legacyHandler = new ClientHandlerRegistry(local);
+ }
+
+ private void init(IoSession session)
+ {
+ _session = session;
+ _handler = new ClientAdapter(session, _legacyHandler);
+ }
+
+ private ConnectFuture connectImpl()
+ {
+ _logger.info("Connecting to cluster peer: " + getDetails());
+ SocketConnector ioConnector = new SocketConnector();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay(true);
+ scfg.setSendBufferSize(32768);
+ scfg.setReceiveBufferSize(32768);
+ InetSocketAddress address = new InetSocketAddress(getHost(), getPort());
+ return ioConnector.connect(address, _binding);
+ }
+
+ //extablish connection without handling redirect
+ boolean connect() throws IOException, InterruptedException
+ {
+ ConnectFuture future = connectImpl();
+ // wait for connection to complete
+ future.join();
+ // we call getSession which throws an IOException if there has been an error connecting
+ try
+ {
+ future.getSession();
+ }
+ catch (RuntimeIOException e)
+ {
+ _connectionMonitor.failed(e);
+ _logger.error(new LogMessage("Could not connect to {0}: {1}", this, e), e);
+ throw e;
+ }
+ return _connectionMonitor.waitUntilOpen();
+ }
+
+ void connectAsynch(Iterable<AMQMethodBody> msgs)
+ {
+ _replay = msgs;
+ connectImpl();
+ }
+
+ void replay(Iterable<AMQMethodBody> msgs)
+ {
+ _replay = msgs;
+ if(_connectionMonitor.isOpened())
+ {
+ replay();
+ }
+ }
+
+ //establish connection, handling redirect if required...
+ Broker connectToCluster() throws IOException, InterruptedException
+ {
+ connect();
+ //wait until the connection is open or get a redirection
+ if (_connectionMonitor.waitUntilOpen())
+ {
+ return this;
+ }
+ else
+ {
+ Broker broker = new MinaBrokerProxy(_connectionMonitor.getHost(), _connectionMonitor.getPort(), _local);
+ broker.connect();
+ return broker;
+ }
+ }
+
+ public void send(AMQDataBlock data) throws AMQException
+ {
+ if (_session == null)
+ {
+ try
+ {
+ _connectionMonitor.waitUntilOpen();
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Failed to send " + data + ": " + e, e);
+ }
+ }
+ _session.write(data);
+ }
+
+ private void replay()
+ {
+ if(_replay != null)
+ {
+ for(AMQMethodBody b : _replay)
+ {
+ _session.write(new AMQFrame(0, b));
+ }
+ }
+ }
+
+ public void handle(int channel, AMQMethodBody method) throws AMQException
+ {
+ _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel));
+ if (!handleResponse(channel, method))
+ {
+ _logger.warn(new LogMessage("Unhandled method: {0} for channel {1}", method, channel));
+ }
+ }
+
+ private void handleMethod(int channel, AMQMethodBody method) throws AMQException
+ {
+ if (method instanceof ConnectionRedirectBody)
+ {
+ //signal redirection to waiting thread
+ ConnectionRedirectBody redirect = (ConnectionRedirectBody) method;
+ String[] parts = redirect.host.split(":");
+ _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1]));
+ }
+ else
+ {
+ _handler.handle(channel, method);
+ if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this)
+ {
+ _handler = this;
+ _logger.info(new LogMessage("Connection opened, handler switched"));
+ //replay any messages:
+ replay();
+ //signal waiting thread:
+ _connectionMonitor.opened();
+ }
+ }
+ }
+
+ private void handleFrame(AMQFrame frame) throws AMQException
+ {
+ AMQBody body = frame.bodyFrame;
+ if (body instanceof AMQMethodBody)
+ {
+ handleMethod(frame.channel, (AMQMethodBody) body);
+ }
+ else
+ {
+ throw new AMQException("Client only expects method body, got: " + body);
+ }
+ }
+
+ public String toString()
+ {
+ return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]";
+ }
+
+ private class MinaBinding extends IoHandlerAdapter implements ProtocolVersionList
+ {
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ init(session);
+ _logger.info(new LogMessage("{0}: created", MinaBrokerProxy.this));
+ ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
+ session.getFilterChain().addLast("protocolFilter", pcf);
+
+ /* Find last protocol version in protocol version list. Make sure last protocol version
+ listed in the build file (build-module.xml) is the latest version which will be used
+ here. */
+ int i = pv.length - 1;
+ session.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+ }
+
+ public void sessionOpened(IoSession session) throws Exception
+ {
+ _logger.info(new LogMessage("{0}: opened", MinaBrokerProxy.this));
+ }
+
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ _logger.info(new LogMessage("{0}: closed", MinaBrokerProxy.this));
+ }
+
+ public void exceptionCaught(IoSession session, Throwable throwable) throws Exception
+ {
+ _logger.error(new LogMessage("{0}: received {1}", MinaBrokerProxy.this, throwable), throwable);
+ if (! (throwable instanceof IOException))
+ {
+ _session.close();
+ }
+ failed();
+ }
+
+ public void messageReceived(IoSession session, Object object) throws Exception
+ {
+ if (object instanceof AMQFrame)
+ {
+ handleFrame((AMQFrame) object);
+ }
+ else
+ {
+ throw new AMQException("Received message of unrecognised type: " + object);
+ }
+ }
+
+ public void messageSent(IoSession session, Object object) throws Exception
+ {
+ _logger.debug(new LogMessage("{0}: sent {1}", MinaBrokerProxy.this, object));
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
new file mode 100644
index 0000000000..9cf9beeba7
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+public class MinaBrokerProxyFactory implements BrokerFactory
+{
+ private final MemberHandle _local;
+
+ MinaBrokerProxyFactory(MemberHandle local)
+ {
+ _local = local;
+ }
+
+ public Broker create(MemberHandle handle)
+ {
+ return new MinaBrokerProxy(handle.getHost(), handle.getPort(), _local);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java
new file mode 100644
index 0000000000..3d751fabba
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+public interface ResponseHandler
+{
+ public void responded(AMQMethodBody response);
+
+ public void removed();
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java b/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java
new file mode 100644
index 0000000000..ad50a5a737
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public interface Sendable
+{
+ public void send(int channel, Member member) throws AMQException;
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
new file mode 100644
index 0000000000..2b4408b66c
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.IllegalStateTransitionException;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.cluster.util.LogMessage;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An extension of server.AMQStateManager that allows different handlers to be registered.
+ *
+ */
+class ServerHandlerRegistry extends AMQStateManager
+{
+ private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
+ private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
+
+ ServerHandlerRegistry()
+ {
+ super(AMQState.CONNECTION_NOT_STARTED, false);
+ }
+
+ ServerHandlerRegistry(ServerHandlerRegistry s)
+ {
+ this();
+ _handlers.putAll(s._handlers);
+ }
+
+ ServerHandlerRegistry(MethodHandlerFactory factory)
+ {
+ this();
+ init(factory);
+ }
+
+ void setHandlers(AMQState state, MethodHandlerRegistry handlers)
+ {
+ _handlers.put(state, handlers);
+ }
+
+ void init(MethodHandlerFactory factory)
+ {
+ for (AMQState s : AMQState.values())
+ {
+ setHandlers(s, factory.register(s, new MethodHandlerRegistry()));
+ }
+ }
+
+ protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException
+ {
+ MethodHandlerRegistry registry = _handlers.get(state);
+ StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame);
+ if (handler == null)
+ {
+ _logger.warn(new LogMessage("No handler for {0}, {1}", state, frame));
+ }
+ return handler;
+ }
+
+ <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler)
+ {
+ MethodHandlerRegistry registry = _handlers.get(state);
+ if (registry == null)
+ {
+ registry = new MethodHandlerRegistry();
+ _handlers.put(state, registry);
+ }
+ registry.addHandler(type, handler);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java
new file mode 100644
index 0000000000..f0b1db25e6
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SimpleMemberHandle implements MemberHandle
+{
+ private final String _host;
+ private final int _port;
+
+ public SimpleMemberHandle(String host, int port)
+ {
+ _host = host;
+ _port = port;
+ }
+
+ public SimpleMemberHandle(String details)
+ {
+ String[] parts = details.split(":");
+ _host = parts[0];
+ _port = Integer.parseInt(parts[1]);
+ }
+
+ public SimpleMemberHandle(InetSocketAddress address) throws UnknownHostException
+ {
+ this(address.getAddress(), address.getPort());
+ }
+
+ public SimpleMemberHandle(InetAddress address, int port) throws UnknownHostException
+ {
+ this(canonical(address).getHostAddress(), port);
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public int hashCode()
+ {
+ return getPort();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof MemberHandle && matches((MemberHandle) o);
+ }
+
+ public boolean matches(MemberHandle m)
+ {
+ return matches(m.getHost(), m.getPort());
+ }
+
+ public boolean matches(String host, int port)
+ {
+ return _host.equals(host) && _port == port;
+ }
+
+ public String getDetails()
+ {
+ return _host + ":" + _port;
+ }
+
+ public String toString()
+ {
+ return getDetails();
+ }
+
+ static List<MemberHandle> stringToMembers(String membership)
+ {
+ String[] names = membership.split("\\s");
+ List<MemberHandle> members = new ArrayList<MemberHandle>();
+ for (String name : names)
+ {
+ members.add(new SimpleMemberHandle(name));
+ }
+ return members;
+ }
+
+ static String membersToString(List<MemberHandle> members)
+ {
+ StringBuffer buffer = new StringBuffer();
+ boolean first = true;
+ for (MemberHandle m : members)
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ buffer.append(" ");
+ }
+ buffer.append(m.getDetails());
+ }
+
+ return buffer.toString();
+ }
+
+ private static InetAddress canonical(InetAddress address) throws UnknownHostException
+ {
+ if (address.isLoopbackAddress())
+ {
+ return InetAddress.getLocalHost();
+ }
+ else
+ {
+ return address;
+ }
+ }
+
+ public MemberHandle resolve()
+ {
+ return resolve(this);
+ }
+
+ public static MemberHandle resolve(MemberHandle handle)
+ {
+ try
+ {
+ return new SimpleMemberHandle(new InetSocketAddress(handle.getHost(), handle.getPort()));
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ return handle;
+ }
+ }
+
+
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java
new file mode 100644
index 0000000000..4b75e76d97
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class SimpleSendable implements Sendable
+{
+ private final List<AMQBody> _bodies;
+
+ public SimpleSendable(AMQBody body)
+ {
+ this(Arrays.asList(body));
+ }
+
+ public SimpleSendable(List<AMQBody> bodies)
+ {
+ _bodies = bodies;
+ }
+
+ public void send(int channel, Member member) throws AMQException
+ {
+ for (AMQBody body : _bodies)
+ {
+ member.send(new AMQFrame(channel, body));
+ }
+ }
+
+ public String toString()
+ {
+ return _bodies.toString();
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
new file mode 100644
index 0000000000..5ee07af596
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+
+import java.util.List;
+import java.util.ArrayList;
+
+public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A>
+{
+ private final List<ClusterMethodHandler<A>> _handlers;
+
+ private ChainedClusterMethodHandler()
+ {
+ this(new ArrayList<ClusterMethodHandler<A>>());
+ }
+
+ public ChainedClusterMethodHandler(List<ClusterMethodHandler<A>> handlers)
+ {
+ _handlers = handlers;
+ }
+
+ public ChainedClusterMethodHandler(ClusterMethodHandler<A>... handlers)
+ {
+ this();
+ for(ClusterMethodHandler<A>handler: handlers)
+ {
+ _handlers.add(handler);
+ }
+ }
+
+ protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ for(ClusterMethodHandler<A> handler : _handlers)
+ {
+ handler.peer(stateMgr, queues, exchanges, session, evt);
+ }
+ }
+
+ protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ for(ClusterMethodHandler<A> handler : _handlers)
+ {
+ handler.client(stateMgr, queues, exchanges, session, evt);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
new file mode 100644
index 0000000000..cd25bd178a
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
@@ -0,0 +1,136 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Maintains the default queue names for a channel, and alters subsequent frames where necessary
+ * to use this (i.e. when no queue is explictly specified).
+ *
+ */
+class ChannelQueueManager
+{
+ private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class);
+ private final Map<Integer, String> _channelQueues = new HashMap<Integer, String>();
+
+ ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler()
+ {
+ return new QueueDeclareHandler();
+ }
+
+ ClusterMethodHandler<QueueDeleteBody> createQueueDeleteHandler()
+ {
+ return new QueueDeleteHandler();
+ }
+
+ ClusterMethodHandler<QueueBindBody> createQueueBindHandler()
+ {
+ return new QueueBindHandler();
+ }
+
+ ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler()
+ {
+ return new BasicConsumeHandler();
+ }
+
+ private void set(int channel, String queue)
+ {
+ _channelQueues.put(channel, queue);
+ _logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue));
+ }
+
+ private String get(int channel)
+ {
+ String queue = _channelQueues.get(channel);
+ _logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue));
+ return queue;
+ }
+
+ private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody>
+ {
+ protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ {
+ }
+
+ protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ {
+ set(evt.getChannelId(), evt.getMethod().queue);
+ }
+ }
+ private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody>
+ {
+ protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ {
+ }
+
+ protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ {
+ if(evt.getMethod().queue == null)
+ {
+ evt.getMethod().queue = get(evt.getChannelId());
+ }
+ }
+ }
+ private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody>
+ {
+ protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ {
+ }
+
+ protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ {
+ if(evt.getMethod().queue == null)
+ {
+ evt.getMethod().queue = get(evt.getChannelId());
+ }
+ }
+ }
+
+ private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody>
+ {
+ protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ {
+ }
+
+ protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ {
+ if(evt.getMethod().queue == null)
+ {
+ evt.getMethod().queue = get(evt.getChannelId());
+ }
+ }
+ }
+
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
new file mode 100644
index 0000000000..9e4444819e
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.AMQException;
+
+public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
+{
+ public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ if (ClusteredProtocolSession.isPeerSession(session))
+ {
+ peer(stateMgr, queues, exchanges, session, evt);
+ }
+ else
+ {
+ client(stateMgr, queues, exchanges, session, evt);
+ }
+ }
+
+ protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
new file mode 100644
index 0000000000..a1684b399f
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ClusterJoinBody;
+import org.apache.qpid.framing.ClusterLeaveBody;
+import org.apache.qpid.framing.ClusterMembershipBody;
+import org.apache.qpid.framing.ClusterPingBody;
+import org.apache.qpid.framing.ClusterSuspectBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.ClusterSynchBody;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxCommitBody;
+import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.server.cluster.ClusterCapability;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.LoadTable;
+import org.apache.qpid.server.cluster.MemberHandle;
+import org.apache.qpid.server.cluster.MethodHandlerFactory;
+import org.apache.qpid.server.cluster.MethodHandlerRegistry;
+import org.apache.qpid.server.cluster.SimpleMemberHandle;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.handler.ChannelCloseHandler;
+import org.apache.qpid.server.handler.ChannelFlowHandler;
+import org.apache.qpid.server.handler.ChannelOpenHandler;
+import org.apache.qpid.server.handler.ConnectionCloseMethodHandler;
+import org.apache.qpid.server.handler.ConnectionOpenMethodHandler;
+import org.apache.qpid.server.handler.ConnectionSecureOkMethodHandler;
+import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler;
+import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler;
+import org.apache.qpid.server.handler.ExchangeDeclareHandler;
+import org.apache.qpid.server.handler.ExchangeDeleteHandler;
+import org.apache.qpid.server.handler.BasicCancelMethodHandler;
+import org.apache.qpid.server.handler.BasicPublishMethodHandler;
+import org.apache.qpid.server.handler.QueueBindHandler;
+import org.apache.qpid.server.handler.QueueDeleteHandler;
+import org.apache.qpid.server.handler.BasicQosHandler;
+import org.apache.qpid.server.handler.TxSelectHandler;
+import org.apache.qpid.server.handler.TxCommitHandler;
+import org.apache.qpid.server.handler.TxRollbackHandler;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ClusterMethodHandlerFactory implements MethodHandlerFactory
+{
+ private final GroupManager _groupMgr;
+ private final LoadTable _loadTable;
+
+ public ClusterMethodHandlerFactory(GroupManager groupMgr, LoadTable loadTable)
+ {
+ _groupMgr = groupMgr;
+ _loadTable = loadTable;
+ }
+
+ public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry)
+ {
+ switch (state)
+ {
+ case CONNECTION_NOT_STARTED:
+ return registry.addHandler(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance());
+ case CONNECTION_NOT_AUTH:
+ return registry.addHandler(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance());
+ case CONNECTION_NOT_TUNED:
+ return registry.addHandler(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance());
+ case CONNECTION_NOT_OPENED:
+ //connection.open override:
+ return registry.addHandler(ConnectionOpenBody.class, new ConnectionOpenHandler());
+ case CONNECTION_OPEN:
+ return registerConnectionOpened(registry);
+ }
+ return registry;
+ }
+
+ private MethodHandlerRegistry registerConnectionOpened(MethodHandlerRegistry registry)
+ {
+ //new cluster method handlers:
+ registry.addHandler(ClusterJoinBody.class, new JoinHandler());
+ registry.addHandler(ClusterLeaveBody.class, new LeaveHandler());
+ registry.addHandler(ClusterSuspectBody.class, new SuspectHandler());
+ registry.addHandler(ClusterMembershipBody.class, new MembershipHandler());
+ registry.addHandler(ClusterPingBody.class, new PingHandler());
+ registry.addHandler(ClusterSynchBody.class, new SynchHandler());
+
+ //connection.close override:
+ registry.addHandler(ConnectionCloseBody.class, new ConnectionCloseHandler());
+
+ //replicated handlers:
+ registry.addHandler(ExchangeDeclareBody.class, replicated(ExchangeDeclareHandler.getInstance()));
+ registry.addHandler(ExchangeDeleteBody.class, replicated(ExchangeDeleteHandler.getInstance()));
+
+ ChannelQueueManager channelQueueMgr = new ChannelQueueManager();
+
+
+ LocalQueueDeclareHandler handler = new LocalQueueDeclareHandler(_groupMgr);
+ registry.addHandler(QueueDeclareBody.class,
+ chain(new QueueNameGenerator(handler),
+ channelQueueMgr.createQueueDeclareHandler(),
+ new ReplicatingHandler<QueueDeclareBody>(_groupMgr, handler)));
+
+ registry.addHandler(QueueBindBody.class, chain(channelQueueMgr.createQueueBindHandler(), replicated(QueueBindHandler.getInstance())));
+ registry.addHandler(QueueDeleteBody.class, chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new QueueDeleteHandler(false), new QueueDeleteHandler(true)))));
+ registry.addHandler(BasicConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr)));
+
+ //other modified handlers:
+ registry.addHandler(BasicCancelBody.class, alternate(new RemoteCancelHandler(), BasicCancelMethodHandler.getInstance()));
+
+ //other unaffected handlers:
+ registry.addHandler(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
+ registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance());
+ registry.addHandler(ChannelOpenBody.class, ChannelOpenHandler.getInstance());
+ registry.addHandler(ChannelCloseBody.class, ChannelCloseHandler.getInstance());
+ registry.addHandler(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
+ registry.addHandler(TxSelectBody.class, TxSelectHandler.getInstance());
+ registry.addHandler(TxCommitBody.class, TxCommitHandler.getInstance());
+ registry.addHandler(TxRollbackBody.class, TxRollbackHandler.getInstance());
+
+
+ return registry;
+ }
+
+ private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody>
+ {
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
+ {
+ _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session));
+ }
+ }
+
+ private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody>
+ {
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
+ {
+ _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker));
+ }
+ }
+
+ private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody>
+ {
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
+ {
+ _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker));
+ }
+ }
+
+ private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody>
+ {
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
+ {
+ _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker));
+ }
+ }
+
+ private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody>
+ {
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
+ {
+ ClusterMembershipBody body = evt.getMethod();
+ _groupMgr.handleMembershipAnnouncement(new String(body.members));
+ }
+ }
+
+ private class PingHandler implements StateAwareMethodListener<ClusterPingBody>
+ {
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ AMQMethodEvent<ClusterPingBody> evt) throws AMQException
+ {
+ MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker);
+ _groupMgr.handlePing(peer, evt.getMethod().load);
+ if (evt.getMethod().responseRequired)
+ {
+ evt.getMethod().load = _loadTable.getLocalLoad();
+ session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
+ }
+ }
+ }
+
+ private class ConnectionOpenHandler extends ExtendedHandler<ConnectionOpenBody>
+ {
+ ConnectionOpenHandler()
+ {
+ super(ConnectionOpenMethodHandler.getInstance());
+ }
+
+ void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt)
+ {
+ String capabilities = evt.getMethod().capabilities;
+ if (ClusterCapability.contains(capabilities))
+ {
+ ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities));
+ }
+ else
+ {
+ _loadTable.incrementLocalLoad();
+ }
+ }
+ }
+
+ private class ConnectionCloseHandler extends ExtendedHandler<ConnectionCloseBody>
+ {
+ ConnectionCloseHandler()
+ {
+ super(ConnectionCloseMethodHandler.getInstance());
+ }
+
+ void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt)
+ {
+ if (!ClusteredProtocolSession.isPeerSession(session))
+ {
+ _loadTable.decrementLocalLoad();
+ }
+ }
+ }
+
+ private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler)
+ {
+ return new ReplicatingHandler<B>(_groupMgr, handler);
+ }
+
+ private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client)
+ {
+ return new PeerHandler<B>(peer, client);
+ }
+
+ private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h)
+ {
+ return new ChainedClusterMethodHandler<B>(h);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
new file mode 100644
index 0000000000..08dcfbe121
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
+{
+ private final StateAwareMethodListener<A> _base;
+
+ ExtendedHandler(StateAwareMethodListener<A> base)
+ {
+ _base = base;
+ }
+
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ preHandle(stateMgr, session, evt);
+ _base.methodReceived(stateMgr, queues, exchanges, session, evt);
+ postHandle(stateMgr, session, evt);
+ }
+
+ void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ }
+
+ void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java
new file mode 100644
index 0000000000..27eff59685
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java
@@ -0,0 +1,22 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+public abstract class HandlerUtils
+{
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
new file mode 100644
index 0000000000..997c055d77
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.cluster.MemberHandle;
+import org.apache.qpid.server.handler.QueueDeclareHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.ClusteredQueue;
+import org.apache.qpid.server.queue.PrivateQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.RemoteQueueProxy;
+
+public class LocalQueueDeclareHandler extends QueueDeclareHandler
+{
+ private static final Logger _logger = Logger.getLogger(LocalQueueDeclareHandler.class);
+ private final GroupManager _groupMgr;
+
+ LocalQueueDeclareHandler(GroupManager groupMgr)
+ {
+ _groupMgr = groupMgr;
+ }
+
+ protected String createName()
+ {
+ return super.createName() + "@" + _groupMgr.getLocal().getDetails();
+ }
+
+ protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException
+ {
+ //is it private or shared:
+ if (body.exclusive)
+ {
+ if (ClusteredProtocolSession.isPeerSession(session))
+ {
+ //need to get peer from the session...
+ MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
+ _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer));
+ return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, peer.getDetails(), body.autoDelete, registry);
+ }
+ else
+ {
+ _logger.debug(new LogMessage("Creating local private queue {0}", body.queue));
+ return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry);
+ }
+ }
+ else
+ {
+ _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue));
+ return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java
new file mode 100644
index 0000000000..7fae2c6598
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
+{
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException
+ {
+ }
+}
+
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java
new file mode 100644
index 0000000000..a2a570f045
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+/**
+ * Base for implementing handlers that carry out different actions based on whether the method they
+ * are handling was sent by a peer (i.e. another broker in the cluster) or a client (i.e. an end-user
+ * application).
+ *
+ */
+public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A>
+{
+ private final StateAwareMethodListener<A> _peer;
+ private final StateAwareMethodListener<A> _client;
+
+ PeerHandler(StateAwareMethodListener<A> peer, StateAwareMethodListener<A> client)
+ {
+ _peer = peer;
+ _client = client;
+ }
+
+ protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ _peer.methodReceived(stateMgr, queues, exchanges, session, evt);
+ }
+
+ protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ _client.methodReceived(stateMgr, queues, exchanges, session, evt);
+ }
+
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
new file mode 100644
index 0000000000..2f15373eba
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+
+/**
+ * Generates queue names for queues declared with no name.
+ *
+ */
+class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody>
+{
+ private final LocalQueueDeclareHandler _handler;
+
+ QueueNameGenerator(LocalQueueDeclareHandler handler)
+ {
+ _handler = handler;
+ }
+
+ protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ {
+ }
+
+ protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges,
+ AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt)
+ throws AMQException
+ {
+ setName(evt.getMethod());//need to set the name before propagating this method
+ }
+
+ protected void setName(QueueDeclareBody body)
+ {
+ if (body.queue == null)
+ {
+ body.queue = _handler.createName();
+ }
+ }
+}
+
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
new file mode 100644
index 0000000000..9a598d7f07
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.ClusteredQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody>
+{
+ private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
+
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
+ {
+ //By convention, consumers setup between brokers use the queue name as the consumer tag:
+ AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag);
+ if (queue instanceof ClusteredQueue)
+ {
+ ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));
+ }
+ else
+ {
+ _logger.warn("Got remote cancel request for non-clustered queue: " + queue);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
new file mode 100644
index 0000000000..24ce4087fb
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.ClusteredQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+/**
+ * Handles consume requests from other cluster members.
+ *
+ */
+public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsumeBody>
+{
+ private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class);
+
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ {
+ AMQQueue queue = queues.getQueue(evt.getMethod().queue);
+ if (queue instanceof ClusteredQueue)
+ {
+ ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue));
+ }
+ else
+ {
+ _logger.warn("Got remote consume request for non-clustered queue: " + queue);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
new file mode 100644
index 0000000000..4a895fcd0a
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.server.cluster.BroadcastPolicy;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody>
+{
+ ReplicatingConsumeHandler(GroupManager groupMgr)
+ {
+ this(groupMgr, null);
+ }
+
+ ReplicatingConsumeHandler(GroupManager groupMgr, BroadcastPolicy policy)
+ {
+ super(groupMgr, base(), policy);
+ }
+
+ protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ {
+ //only replicate if the queue in question is a shared queue
+ if (isShared(queues.getQueue(evt.getMethod().queue)))
+ {
+ super.replicate(stateMgr, queues, exchanges, session, evt);
+ }
+ else
+ {
+ _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod()));
+ local(stateMgr, queues, exchanges, session, evt);
+ _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod()));
+
+ }
+ }
+
+ protected boolean isShared(AMQQueue queue)
+ {
+ return queue != null && queue.isShared();
+ }
+
+ static StateAwareMethodListener<BasicConsumeBody> base()
+ {
+ return new PeerHandler<BasicConsumeBody>(peer(), client());
+ }
+
+ static StateAwareMethodListener<BasicConsumeBody> peer()
+ {
+ return new RemoteConsumeHandler();
+ }
+
+ static StateAwareMethodListener<BasicConsumeBody> client()
+ {
+ return BasicConsumeMethodHandler.getInstance();
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
new file mode 100644
index 0000000000..a5fab27d16
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
@@ -0,0 +1,127 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.BroadcastPolicy;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.GroupResponseHandler;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.cluster.Member;
+import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.cluster.policy.StandardPolicies;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+import java.util.List;
+
+/**
+ * Basic template for handling methods that should be broadcast to the group and
+ * processed locally after 'completion' of this broadcast.
+ *
+ */
+class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> implements StandardPolicies
+{
+ protected static final Logger _logger = Logger.getLogger(ReplicatingHandler.class);
+
+ private final StateAwareMethodListener<A> _base;
+ private final GroupManager _groupMgr;
+ private final BroadcastPolicy _policy;
+
+ ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base)
+ {
+ this(groupMgr, base, null);
+ }
+
+ ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base, BroadcastPolicy policy)
+ {
+ _groupMgr = groupMgr;
+ _base = base;
+ _policy = policy;
+ }
+
+ protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ local(stateMgr, queues, exchanges, session, evt);
+ _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod()));
+ }
+
+ protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ replicate(stateMgr, queues, exchanges, session, evt);
+ }
+
+ protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ if (_policy == null)
+ {
+ //asynch delivery
+ _groupMgr.broadcast(new SimpleSendable(evt.getMethod()));
+ local(stateMgr, queues, exchanges, session, evt);
+ }
+ else
+ {
+ Callback callback = new Callback(stateMgr, queues, exchanges, session, evt);
+ _groupMgr.broadcast(new SimpleSendable(evt.getMethod()), _policy, callback);
+ }
+ _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod()));
+ }
+
+ protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ {
+ _base.methodReceived(stateMgr, queues, exchanges, session, evt);
+ }
+
+ private class Callback implements GroupResponseHandler
+ {
+ private final AMQStateManager _stateMgr;
+ private final QueueRegistry _queues;
+ private final ExchangeRegistry _exchanges;
+ private final AMQProtocolSession _session;
+ private final AMQMethodEvent<A> _evt;
+
+ Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt)
+ {
+ _stateMgr = stateMgr;
+ _queues = queues;
+ _exchanges = exchanges;
+ _session = session;
+ _evt = evt;
+ }
+
+ public void response(List<AMQMethodBody> responses, List<Member> members)
+ {
+ try
+ {
+ local(_stateMgr, _queues, _exchanges, _session, _evt);
+ _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod()));
+ }
+ catch (AMQException e)
+ {
+ _logger.error(new LogMessage("Error handling {0}:{1}", _evt.getMethod(), e), e);
+ }
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java
new file mode 100644
index 0000000000..18c3f5ce58
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class WrappedListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
+{
+ private final StateAwareMethodListener<T> _primary;
+ private final StateAwareMethodListener _post;
+ private final StateAwareMethodListener _pre;
+
+ WrappedListener(StateAwareMethodListener<T> primary, StateAwareMethodListener pre, StateAwareMethodListener post)
+ {
+ _pre = check(pre);
+ _post = check(post);
+ _primary = check(primary);
+ }
+
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException
+ {
+ _pre.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _primary.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _post.methodReceived(stateMgr, queues, exchanges, session, evt);
+ }
+
+ private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in)
+ {
+ return in == null ? new NullListener<T>() : in;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
new file mode 100644
index 0000000000..912651d3ce
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.handler;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.MethodHandlerFactory;
+import org.apache.qpid.server.cluster.MethodHandlerRegistry;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public abstract class WrappingMethodHandlerFactory implements MethodHandlerFactory
+{
+ private final MethodHandlerFactory _delegate;
+ private final StateAwareMethodListener _pre;
+ private final StateAwareMethodListener _post;
+
+ protected WrappingMethodHandlerFactory(MethodHandlerFactory delegate,
+ StateAwareMethodListener pre,
+ StateAwareMethodListener post)
+ {
+ _delegate = delegate;
+ _pre = pre;
+ _post = post;
+ }
+
+ public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry)
+ {
+ if (isWrappableState(state))
+ {
+ return wrap(_delegate.register(state, registry), state);
+ }
+ else
+ {
+ return _delegate.register(state, registry);
+ }
+ }
+
+ protected abstract boolean isWrappableState(AMQState state);
+
+ protected abstract Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state);
+
+ private MethodHandlerRegistry wrap(MethodHandlerRegistry registry, AMQState state)
+ {
+ for (FrameDescriptor fd : getWrappableFrameTypes(state))
+ {
+ wrap(registry, fd.type, fd.instance);
+ }
+ return registry;
+ }
+
+ private <A extends AMQMethodBody, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame)
+ {
+ r.addHandler(type, new WrappedListener<A>(r.getHandler(frame), _pre, _post));
+ }
+
+ protected static class FrameDescriptor<A extends AMQMethodBody, B extends Class<A>>
+ {
+ protected final A instance;
+ protected final B type;
+
+ public FrameDescriptor(B type, A instance)
+ {
+ this.instance = instance;
+ this.type = type;
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
new file mode 100644
index 0000000000..044d264380
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.policy;
+
+import org.apache.qpid.server.cluster.BroadcastPolicy;
+
+public class AsynchBroadcastPolicy implements BroadcastPolicy
+{
+ public boolean isComplete(int responded, int members)
+ {
+ return true;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
new file mode 100644
index 0000000000..cd004c36d0
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.policy;
+
+import org.apache.qpid.server.cluster.BroadcastPolicy;
+
+public class MajorityResponseBroadcastPolicy implements BroadcastPolicy
+{
+ public boolean isComplete(int responded, int members)
+ {
+ return responded > members / 2;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
new file mode 100644
index 0000000000..0aca6fbe97
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.policy;
+
+import org.apache.qpid.server.cluster.BroadcastPolicy;
+
+public class OneResponseBroadcastPolicy implements BroadcastPolicy
+{
+ public boolean isComplete(int responded, int members)
+ {
+ return responded > 0;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java
new file mode 100644
index 0000000000..fbee483967
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.policy;
+
+import org.apache.qpid.server.cluster.BroadcastPolicy;
+
+public interface StandardPolicies
+{
+ public static final BroadcastPolicy ASYNCH_POLICY = new AsynchBroadcastPolicy();
+ public static final BroadcastPolicy SYNCH_POLICY = new SynchBroadcastPolicy();
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java b/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
new file mode 100644
index 0000000000..1c6023d35d
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.policy;
+
+import org.apache.qpid.server.cluster.BroadcastPolicy;
+
+public class SynchBroadcastPolicy implements BroadcastPolicy
+{
+ public boolean isComplete(int responded, int members)
+ {
+ return responded == members;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
new file mode 100644
index 0000000000..cbbc8679a8
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.replay;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+abstract class ChainedMethodRecorder <T extends AMQMethodBody> implements MethodRecorder<T>
+{
+ private final MethodRecorder<T> _recorder;
+
+ ChainedMethodRecorder()
+ {
+ this(null);
+ }
+
+ ChainedMethodRecorder(MethodRecorder<T> recorder)
+ {
+ _recorder = recorder;
+ }
+
+ public final void record(T method)
+ {
+ if(!doRecord(method) && _recorder != null)
+ {
+ _recorder.record(method);
+ }
+ }
+
+ protected abstract boolean doRecord(T method);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
new file mode 100644
index 0000000000..7ba51108f5
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.replay;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+
+class ConsumerCounts
+{
+ private final Map<String, Integer> _counts = new HashMap<String, Integer>();
+
+ synchronized void increment(String queue)
+ {
+ _counts.put(queue, get(queue) + 1);
+ }
+
+ synchronized void decrement(String queue)
+ {
+ _counts.put(queue, get(queue) - 1);
+ }
+
+ private int get(String queue)
+ {
+ Integer count = _counts.get(queue);
+ return count == null ? 0 : count;
+ }
+
+ synchronized void replay(List<AMQMethodBody> messages)
+ {
+ for(String queue : _counts.keySet())
+ {
+ BasicConsumeBody m = new BasicConsumeBody();
+ m.queue = queue;
+ m.consumerTag = queue;
+ replay(m, messages);
+ }
+ }
+
+ private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages)
+ {
+ int count = _counts.get(msg.queue);
+ for(int i = 0; i < count; i++)
+ {
+ messages.add(msg);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java
new file mode 100644
index 0000000000..8d5ce4f18e
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.replay;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * Abstraction through which a method can be recorded for replay
+ *
+ */
+interface MethodRecorder<T extends AMQMethodBody>
+{
+ public void record(T method);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
new file mode 100644
index 0000000000..638ec64e09
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.replay;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.server.cluster.MethodHandlerFactory;
+import org.apache.qpid.server.cluster.MethodHandlerRegistry;
+import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+import java.util.Arrays;
+
+public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory
+{
+ private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
+ {
+ new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody()),
+ new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody()),
+ new FrameDescriptor(QueueBindBody.class, new QueueBindBody()),
+ new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody()),
+ new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody()),
+ new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody()),
+ new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody())
+ });
+
+
+ public RecordingMethodHandlerFactory(MethodHandlerFactory factory, ReplayStore store)
+ {
+ super(factory, null, store);
+ }
+
+ protected boolean isWrappableState(AMQState state)
+ {
+ return AMQState.CONNECTION_OPEN.equals(state);
+ }
+
+ protected Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state)
+ {
+ return _frames;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java
new file mode 100644
index 0000000000..1f555b0f91
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.replay;
+
+import org.apache.qpid.server.cluster.Sendable;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+
+import java.util.List;
+
+/**
+ * Abstraction of a replay strategy for use in getting joining members up to
+ * date with respect to cluster state.
+ *
+ */
+public interface ReplayManager
+{
+ public List<AMQMethodBody> replay(boolean isLeader);
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java
new file mode 100644
index 0000000000..66bd8e0b0c
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -0,0 +1,310 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.replay;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.ClusterSynchBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.cluster.util.Bindings;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Stores method invocations for replay to new members.
+ *
+ */
+public class ReplayStore implements ReplayManager, StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ReplayStore.class);
+
+ private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
+ private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
+ private final Map<String, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<String, QueueDeclareBody>();
+ private final Map<String, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<String, QueueDeclareBody>();
+ private final Bindings<String, String, QueueBindBody> _sharedBindings = new Bindings<String, String, QueueBindBody>();
+ private final Bindings<String, String, QueueBindBody> _privateBindings = new Bindings<String, String, QueueBindBody>();
+ private final Map<String, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<String, ExchangeDeclareBody>();
+ private final ConsumerCounts _consumers = new ConsumerCounts();
+
+ public ReplayStore()
+ {
+ _globalRecorders.put(QueueDeclareBody.class, new SharedQueueDeclareRecorder());
+ _globalRecorders.put(QueueDeleteBody.class, new SharedQueueDeleteRecorder());
+ _globalRecorders.put(QueueBindBody.class, new SharedQueueBindRecorder());
+ _globalRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder());
+ _globalRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
+
+ _localRecorders.put(QueueDeclareBody.class, new PrivateQueueDeclareRecorder());
+ _localRecorders.put(QueueDeleteBody.class, new PrivateQueueDeleteRecorder());
+ _localRecorders.put(QueueBindBody.class, new PrivateQueueBindRecorder());
+ _localRecorders.put(BasicConsumeBody.class, new BasicConsumeRecorder());
+ _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder());
+ _localRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder());
+ _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
+ }
+
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod()));
+ AMQMethodBody request = evt.getMethod();
+
+ //allow any (relevant) recorder registered for this type of request to record it:
+ MethodRecorder recorder = getRecorders(session).get(request.getClass());
+ if (recorder != null)
+ {
+ recorder.record(request);
+ }
+ }
+
+ private Map<Class<? extends AMQMethodBody>, MethodRecorder> getRecorders(AMQProtocolSession session)
+ {
+ if (ClusteredProtocolSession.isPeerSession(session))
+ {
+ return _globalRecorders;
+ }
+ else
+ {
+ return _localRecorders;
+ }
+ }
+
+ public List<AMQMethodBody> replay(boolean isLeader)
+ {
+ List<AMQMethodBody> methods = new ArrayList<AMQMethodBody>();
+ methods.addAll(_exchanges.values());
+ methods.addAll(_privateQueues.values());
+ synchronized(_privateBindings)
+ {
+ methods.addAll(_privateBindings.values());
+ }
+ if (isLeader)
+ {
+ methods.addAll(_sharedQueues.values());
+ synchronized(_sharedBindings)
+ {
+ methods.addAll(_sharedBindings.values());
+ }
+ }
+ _consumers.replay(methods);
+ methods.add(new ClusterSynchBody());
+ return methods;
+ }
+
+ private class BasicConsumeRecorder implements MethodRecorder<BasicConsumeBody>
+ {
+ public void record(BasicConsumeBody method)
+ {
+ if(_sharedQueues.containsKey(method.queue))
+ {
+ _consumers.increment(method.queue);
+ }
+ }
+ }
+
+ private class BasicCancelRecorder implements MethodRecorder<BasicCancelBody>
+ {
+ public void record(BasicCancelBody method)
+ {
+ if(_sharedQueues.containsKey(method.consumerTag))
+ {
+ _consumers.decrement(method.consumerTag);
+ }
+ }
+ }
+
+ private class SharedQueueDeclareRecorder extends QueueDeclareRecorder
+ {
+ SharedQueueDeclareRecorder()
+ {
+ super(false, _sharedQueues);
+ }
+ }
+
+ private class PrivateQueueDeclareRecorder extends QueueDeclareRecorder
+ {
+ PrivateQueueDeclareRecorder()
+ {
+ super(true, _privateQueues, new SharedQueueDeclareRecorder());
+ }
+ }
+
+ private class SharedQueueDeleteRecorder extends QueueDeleteRecorder
+ {
+ SharedQueueDeleteRecorder()
+ {
+ super(_sharedQueues, _sharedBindings);
+ }
+ }
+
+ private class PrivateQueueDeleteRecorder extends QueueDeleteRecorder
+ {
+ PrivateQueueDeleteRecorder()
+ {
+ super(_privateQueues, _privateBindings, new SharedQueueDeleteRecorder());
+ }
+ }
+
+ private class SharedQueueBindRecorder extends QueueBindRecorder
+ {
+ SharedQueueBindRecorder()
+ {
+ super(_sharedQueues, _sharedBindings);
+ }
+ }
+
+ private class PrivateQueueBindRecorder extends QueueBindRecorder
+ {
+ PrivateQueueBindRecorder()
+ {
+ super(_privateQueues, _privateBindings, new SharedQueueBindRecorder());
+ }
+ }
+
+
+ private static class QueueDeclareRecorder extends ChainedMethodRecorder<QueueDeclareBody>
+ {
+ private final boolean _exclusive;
+ private final Map<String, QueueDeclareBody> _queues;
+
+ QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues)
+ {
+ _queues = queues;
+ _exclusive = exclusive;
+ }
+
+ QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues, QueueDeclareRecorder recorder)
+ {
+ super(recorder);
+ _queues = queues;
+ _exclusive = exclusive;
+ }
+
+
+ protected boolean doRecord(QueueDeclareBody method)
+ {
+ if (_exclusive == method.exclusive)
+ {
+ _queues.put(method.queue, method);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+
+ private class QueueDeleteRecorder extends ChainedMethodRecorder<QueueDeleteBody>
+ {
+ private final Map<String, QueueDeclareBody> _queues;
+ private final Bindings<String, String, QueueBindBody> _bindings;
+
+ QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings)
+ {
+ this(queues, bindings, null);
+ }
+
+ QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueDeleteRecorder recorder)
+ {
+ super(recorder);
+ _queues = queues;
+ _bindings = bindings;
+ }
+
+ protected boolean doRecord(QueueDeleteBody method)
+ {
+ if (_queues.remove(method.queue) != null)
+ {
+ _bindings.unbind1(method.queue);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+
+ private class QueueBindRecorder extends ChainedMethodRecorder<QueueBindBody>
+ {
+ private final Map<String, QueueDeclareBody> _queues;
+ private final Bindings<String, String, QueueBindBody> _bindings;
+
+ QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings)
+ {
+ _queues = queues;
+ _bindings = bindings;
+ }
+
+ QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueBindRecorder recorder)
+ {
+ super(recorder);
+ _queues = queues;
+ _bindings = bindings;
+ }
+
+ protected boolean doRecord(QueueBindBody method)
+ {
+ if (_queues.containsKey(method.queue))
+ {
+ _bindings.bind(method.queue, method.exchange, method);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+
+ private class ExchangeDeclareRecorder implements MethodRecorder<ExchangeDeclareBody>
+ {
+ public void record(ExchangeDeclareBody method)
+ {
+ _exchanges.put(method.exchange, method);
+ }
+ }
+
+ private class ExchangeDeleteRecorder implements MethodRecorder<ExchangeDeleteBody>
+ {
+ public void record(ExchangeDeleteBody method)
+ {
+ _exchanges.remove(method.exchange);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java b/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java
new file mode 100644
index 0000000000..cca3953f34
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+/**
+ * Maps two separate keys to a list of values.
+ *
+ */
+public class Bindings<K1, K2, V>
+{
+ private final MultiValuedMap<K1, Binding<K2>> _a = new MultiValuedMap<K1, Binding<K2>>();
+ private final MultiValuedMap<K2, Binding<K1>> _b = new MultiValuedMap<K2, Binding<K1>>();
+ private final Collection<V> _values = new HashSet<V>();
+
+ public void bind(K1 key1, K2 key2, V value)
+ {
+ _a.add(key1, new Binding<K2>(key2, value));
+ _b.add(key2, new Binding<K1>(key1, value));
+ _values.add(value);
+ }
+
+ public void unbind1(K1 key1)
+ {
+ Collection<Binding<K2>> values = _a.remove(key1);
+ for (Binding<K2> v : values)
+ {
+ _b.remove(v.key);
+ _values.remove(v.value);
+ }
+ }
+
+ public void unbind2(K2 key2)
+ {
+ Collection<Binding<K1>> values = _b.remove(key2);
+ for (Binding<K1> v : values)
+ {
+ _a.remove(v.key);
+ _values.remove(v.value);
+ }
+ }
+
+ public Collection<V> values()
+ {
+ return Collections.unmodifiableCollection(_values);
+ }
+
+ /**
+ * Value needs to hold key to the other map
+ */
+ private class Binding<T>
+ {
+ private final T key;
+ private final V value;
+
+ Binding(T key, V value)
+ {
+ this.key = key;
+ this.value = value;
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java b/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java
new file mode 100644
index 0000000000..5bdc824060
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.util;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * Allows a method to be invoked on a list of listeners with one call
+ *
+ */
+public class InvokeMultiple <T> implements InvocationHandler
+{
+ private final Set<T> _targets = new HashSet<T>();
+ private final T _proxy;
+
+ public InvokeMultiple(Class<? extends T> type)
+ {
+ _proxy = (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[]{type}, this);
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ Set<T> targets;
+ synchronized(this)
+ {
+ targets = new HashSet<T>(_targets);
+ }
+
+ for(T target : targets)
+ {
+ method.invoke(target, args);
+ }
+ return null;
+ }
+
+ public synchronized void addListener(T t)
+ {
+ _targets.add(t);
+ }
+
+ public synchronized void removeListener(T t)
+ {
+ _targets.remove(t);
+ }
+
+ public T getProxy()
+ {
+ return _proxy;
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java b/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java
new file mode 100644
index 0000000000..9824041358
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.util;
+
+import java.text.MessageFormat;
+
+/**
+ * Convenience class to allow log messages to be specified in terms
+ * of MessageFormat patterns with a variable set of parameters. The
+ * production of the string is only done if toSTring is called so it
+ * works well with debug level messages, allowing complex messages
+ * to be specified that are only evaluated if actually printed.
+ *
+ */
+public class LogMessage
+{
+ private final String _message;
+ private final Object[] _args;
+
+ public LogMessage(String message)
+ {
+ this(message, new Object[0]);
+ }
+
+ public LogMessage(String message, Object... args)
+ {
+ _message = message;
+ _args = args;
+ }
+
+ public String toString()
+ {
+ return MessageFormat.format(_message, _args);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java b/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java
new file mode 100644
index 0000000000..1c4e3da6f3
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Maps a key to a collection of values
+ *
+ */
+public class MultiValuedMap<K, V>
+{
+ private Map<K, Collection<V>> _map = new HashMap<K, Collection<V>>();
+
+ public boolean add(K key, V value)
+ {
+ Collection<V> values = get(key);
+ if (values == null)
+ {
+ values = createList();
+ _map.put(key, values);
+ }
+ return values.add(value);
+ }
+
+ public Collection<V> get(K key)
+ {
+ return _map.get(key);
+ }
+
+ public Collection<V> remove(K key)
+ {
+ return _map.remove(key);
+ }
+
+ protected Collection<V> createList()
+ {
+ return new ArrayList<V>();
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java
new file mode 100644
index 0000000000..14d893b040
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.server.cluster.*;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Represents a shared queue in a cluster. The key difference is that as well as any
+ * local consumers, there may be consumers for this queue on other members of the
+ * cluster.
+ *
+ */
+public class ClusteredQueue extends AMQQueue
+{
+ private static final Logger _logger = Logger.getLogger(ClusteredQueue.class);
+ private final ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl> _peers = new ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl>();
+ private final GroupManager _groupMgr;
+ private final NestedSubscriptionManager _subscriptions;
+
+ public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
+ throws AMQException
+ {
+ super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager());
+ _groupMgr = groupMgr;
+ _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
+ }
+
+ public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
+ throws AMQException
+ {
+ super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(),
+ new SubscriptionImpl.Factory());
+ _groupMgr = groupMgr;
+ _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
+ }
+
+ public void deliver(AMQMessage message) throws AMQException
+ {
+ _logger.info(new LogMessage("{0} delivered to clustered queue {1}", message, this));
+ super.deliver(message);
+ }
+
+ protected void autodelete() throws AMQException
+ {
+ if(!_subscriptions.hasActiveSubscribers())
+ {
+ //delete locally:
+ delete();
+
+ //send deletion request to all other members:
+ QueueDeleteBody request = new QueueDeleteBody();
+ request.queue = getName();
+ _groupMgr.broadcast(new SimpleSendable(request));
+ }
+ }
+
+ public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException
+ {
+ //handle locally:
+ super.unregisterProtocolSession(ps, channel, consumerTag);
+
+ //signal other members:
+ BasicCancelBody request = new BasicCancelBody();
+ request.consumerTag = getName();
+ _groupMgr.broadcast(new SimpleSendable(request));
+ }
+
+ public void addRemoteSubcriber(MemberHandle peer)
+ {
+ _logger.info(new LogMessage("Added remote subscriber for {0} to clustered queue {1}", peer, this));
+ //find (or create) a matching subscriber for the peer then increment the count
+ getSubscriber(key(peer), true).increment();
+ }
+
+ public void removeRemoteSubscriber(MemberHandle peer)
+ {
+ //find a matching subscriber for the peer then decrement the count
+ //if count is now zero, remove the subscriber
+ SimpleMemberHandle key = key(peer);
+ RemoteSubscriptionImpl s = getSubscriber(key, true);
+ if (s == null)
+ {
+ throw new RuntimeException("No subscriber for " + peer);
+ }
+ if (s.decrement())
+ {
+ _peers.remove(key);
+ _subscriptions.removeSubscription(s);
+ }
+ }
+
+ public void removeAllRemoteSubscriber(MemberHandle peer)
+ {
+ SimpleMemberHandle key = key(peer);
+ RemoteSubscriptionImpl s = getSubscriber(key, true);
+ _peers.remove(key);
+ _subscriptions.removeSubscription(s);
+ }
+
+ private RemoteSubscriptionImpl getSubscriber(SimpleMemberHandle key, boolean create)
+ {
+ RemoteSubscriptionImpl s = _peers.get(key);
+ if (s == null && create)
+ {
+ return addSubscriber(key, new RemoteSubscriptionImpl(_groupMgr, key));
+ }
+ else
+ {
+ return s;
+ }
+ }
+
+ private RemoteSubscriptionImpl addSubscriber(SimpleMemberHandle key, RemoteSubscriptionImpl s)
+ {
+ RemoteSubscriptionImpl other = _peers.putIfAbsent(key, s);
+ if (other == null)
+ {
+ _subscriptions.addSubscription(s);
+ new SubscriberCleanup(key, this, _groupMgr);
+ return s;
+ }
+ else
+ {
+ return other;
+ }
+ }
+
+ private SimpleMemberHandle key(MemberHandle peer)
+ {
+ return peer instanceof SimpleMemberHandle ? (SimpleMemberHandle) peer : (SimpleMemberHandle) SimpleMemberHandle.resolve(peer);
+ }
+
+ static boolean isFromBroker(AMQMessage msg)
+ {
+ return ClusteredProtocolSession.isPayloadFromPeer(msg);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
new file mode 100644
index 0000000000..0005b20fb1
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.cluster.util.LogMessage;
+
+class ClusteredSubscriptionManager extends SubscriptionSet
+{
+ private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
+ private final NestedSubscriptionManager _all;
+
+ ClusteredSubscriptionManager()
+ {
+ this(new NestedSubscriptionManager());
+ }
+
+ private ClusteredSubscriptionManager(NestedSubscriptionManager all)
+ {
+ _all = all;
+ _all.addSubscription(new Parent());
+ }
+
+ NestedSubscriptionManager getAllSubscribers()
+ {
+ return _all;
+ }
+
+ public boolean hasActiveSubscribers()
+ {
+ return _all.hasActiveSubscribers();
+ }
+
+ public Subscription nextSubscriber(AMQMessage msg)
+ {
+ if(ClusteredQueue.isFromBroker(msg))
+ {
+ //if message is from another broker, it should only be delivered
+ //to another client to meet ordering constraints
+ Subscription s = super.nextSubscriber(msg);
+ _logger.info(new LogMessage("Returning next *client* subscriber {0}", s));
+ if(s == null)
+ {
+ //TODO: deliver to another broker, but set the redelivered flag on the msg
+ //(this should be policy based)
+
+ //for now just don't deliver it
+ return null;
+ }
+ else
+ {
+ return s;
+ }
+ }
+ Subscription s = _all.nextSubscriber(msg);
+ _logger.info(new LogMessage("Returning next subscriber {0}", s));
+ return s;
+ }
+
+ private class Parent implements WeightedSubscriptionManager
+ {
+ public int getWeight()
+ {
+ return ClusteredSubscriptionManager.this.getWeight();
+ }
+
+ public boolean hasActiveSubscribers()
+ {
+ return ClusteredSubscriptionManager.super.hasActiveSubscribers();
+ }
+
+ public Subscription nextSubscriber(AMQMessage msg)
+ {
+ return ClusteredSubscriptionManager.super.nextSubscriber(msg);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java
new file mode 100644
index 0000000000..0bb6537930
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Distributes messages among a list of subsscription managers, using their
+ * weighting.
+ *
+ */
+class NestedSubscriptionManager implements SubscriptionManager
+{
+ private final List<WeightedSubscriptionManager> _subscribers = new CopyOnWriteArrayList<WeightedSubscriptionManager>();
+ private int _iterations;
+ private int _index;
+
+ void addSubscription(WeightedSubscriptionManager s)
+ {
+ _subscribers.add(s);
+ }
+
+ void removeSubscription(WeightedSubscriptionManager s)
+ {
+ _subscribers.remove(s);
+ }
+
+ public boolean hasActiveSubscribers()
+ {
+ for(WeightedSubscriptionManager s : _subscribers)
+ {
+ if(s.hasActiveSubscribers())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public Subscription nextSubscriber(AMQMessage msg)
+ {
+ WeightedSubscriptionManager start = current();
+ for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ {
+ if(hasMore(s))
+ {
+ return nextSubscriber(s);
+ }
+ }
+ return null;
+ }
+
+ private Subscription nextSubscriber(WeightedSubscriptionManager s)
+ {
+ _iterations++;
+ return s.nextSubscriber(null);
+ }
+
+ private WeightedSubscriptionManager current()
+ {
+ return _subscribers.isEmpty() ? null : _subscribers.get(_index);
+ }
+
+ private boolean hasMore(WeightedSubscriptionManager s)
+ {
+ return _iterations < s.getWeight();
+ }
+
+ private WeightedSubscriptionManager next(WeightedSubscriptionManager start)
+ {
+ WeightedSubscriptionManager s = next();
+ return s == start && !hasMore(s) ? null : s;
+ }
+
+ private WeightedSubscriptionManager next()
+ {
+ _iterations = 0;
+ if(++_index >= _subscribers.size())
+ {
+ _index = 0;
+ }
+ return _subscribers.get(_index);
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java
new file mode 100644
index 0000000000..1e7e13a577
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.framing.QueueDeleteBody;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Used to represent a private queue held locally.
+ *
+ */
+public class PrivateQueue extends AMQQueue
+{
+ private final GroupManager _groupMgr;
+
+ public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
+ throws AMQException
+ {
+ super(name, durable, owner, autoDelete, queueRegistry);
+ _groupMgr = groupMgr;
+
+ }
+
+ public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
+ throws AMQException
+ {
+ super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
+ _groupMgr = groupMgr;
+ }
+
+ protected void autodelete() throws AMQException
+ {
+ //delete locally:
+ super.autodelete();
+
+ //send delete request to peers:
+ QueueDeleteBody request = new QueueDeleteBody();
+ request.queue = getName();
+ _groupMgr.broadcast(new SimpleSendable(request));
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java b/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
new file mode 100644
index 0000000000..dac8b616e5
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.cluster.MembershipChangeListener;
+import org.apache.qpid.server.cluster.MemberHandle;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+
+class ProxiedQueueCleanup implements MembershipChangeListener
+{
+ private static final Logger _logger = Logger.getLogger(ProxiedQueueCleanup.class);
+
+ private final MemberHandle _subject;
+ private final RemoteQueueProxy _queue;
+
+ ProxiedQueueCleanup(MemberHandle subject, RemoteQueueProxy queue)
+ {
+ _subject = subject;
+ _queue = queue;
+ }
+
+ public void changed(List<MemberHandle> members)
+ {
+ if(!members.contains(_subject))
+ {
+ try
+ {
+ _queue.delete();
+ _logger.info(new LogMessage("Deleted {0} in response to exclusion of {1}", _queue, _subject));
+ }
+ catch (AMQException e)
+ {
+ _logger.info(new LogMessage("Failed to delete {0} in response to exclusion of {1}: {2}", _queue, _subject, e), e);
+ }
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java
new file mode 100644
index 0000000000..a9a467b306
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.cluster.ClusteredProtocolSession;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.cluster.MemberHandle;
+import org.apache.qpid.server.cluster.SimpleSendable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+/**
+ * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does
+ * not require all the functionality currently in AMQQueue.
+ *
+ */
+public class RemoteQueueProxy extends AMQQueue
+{
+ private static final Logger _logger = Logger.getLogger(RemoteQueueProxy.class);
+ private final MemberHandle _target;
+ private final GroupManager _groupMgr;
+
+ public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
+ throws AMQException
+ {
+ super(name, durable, owner, autoDelete, queueRegistry);
+ _target = target;
+ _groupMgr = groupMgr;
+ _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
+ }
+
+ public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
+ throws AMQException
+ {
+ super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
+ _target = target;
+ _groupMgr = groupMgr;
+ _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
+ }
+
+ public void deliver(AMQMessage msg) throws NoConsumersException
+ {
+ if (ClusteredProtocolSession.canRelay(msg, _target))
+ {
+ try
+ {
+ _logger.debug(new LogMessage("Relaying {0} to {1}", msg, _target));
+ relay(msg);
+ }
+ catch (NoConsumersException e)
+ {
+ throw e;
+ }
+ catch (AMQException e)
+ {
+ //TODO: sort out exception handling...
+ e.printStackTrace();
+ }
+ }
+ else
+ {
+ _logger.debug(new LogMessage("Cannot relay {0} to {1}", msg, _target));
+ }
+ }
+
+ void relay(AMQMessage msg) throws AMQException
+ {
+ BasicPublishBody publish = msg.getPublishBody();
+ ContentHeaderBody header = msg.getContentHeaderBody();
+ List<ContentBody> bodies = msg.getContentBodies();
+
+ //(i) construct a new publishing block:
+ publish.immediate = false;//can't as yet handle the immediate flag in a cluster
+ List<AMQBody> parts = new ArrayList<AMQBody>(2 + bodies.size());
+ parts.add(publish);
+ parts.add(header);
+ parts.addAll(bodies);
+
+ //(ii) send this on to the broker for which it is acting as proxy:
+ _groupMgr.send(_target, new SimpleSendable(parts));
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
new file mode 100644
index 0000000000..9de7a5c849
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.cluster.MemberHandle;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.AMQException;
+
+class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
+{
+ private final GroupManager _groupMgr;
+ private final MemberHandle _peer;
+ private boolean _suspended;
+ private int _count;
+
+ RemoteSubscriptionImpl(GroupManager groupMgr, MemberHandle peer)
+ {
+ _groupMgr = groupMgr;
+ _peer = peer;
+ }
+
+ synchronized void increment()
+ {
+ _count++;
+ }
+
+ synchronized boolean decrement()
+ {
+ return --_count <= 0;
+ }
+
+ public void send(AMQMessage msg, AMQQueue queue)
+ {
+ try
+ {
+ _groupMgr.send(_peer, new SimpleSendable(msg.getPayload()));
+ }
+ catch (AMQException e)
+ {
+ //TODO: handle exceptions properly...
+ e.printStackTrace();
+ }
+ }
+
+ public synchronized void setSuspended(boolean suspended)
+ {
+ _suspended = suspended;
+ }
+
+ public synchronized boolean isSuspended()
+ {
+ return _suspended;
+ }
+
+ public synchronized int getWeight()
+ {
+ return _count;
+ }
+
+ public boolean hasActiveSubscribers()
+ {
+ return getWeight() == 0;
+ }
+
+ public Subscription nextSubscriber(AMQMessage msg)
+ {
+ return this;
+ }
+
+ public void queueDeleted(AMQQueue queue)
+ {
+ if(queue instanceof ClusteredQueue)
+ {
+ ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
+ }
+ }
+}
diff --git a/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java b/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java
new file mode 100644
index 0000000000..cdb6f8f4d2
--- /dev/null
+++ b/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.cluster.MembershipChangeListener;
+import org.apache.qpid.server.cluster.MemberHandle;
+import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+
+class SubscriberCleanup implements MembershipChangeListener
+{
+ private static final Logger _logger = Logger.getLogger(SubscriberCleanup.class);
+
+ private final MemberHandle _subject;
+ private final ClusteredQueue _queue;
+ private final GroupManager _manager;
+
+ SubscriberCleanup(MemberHandle subject, ClusteredQueue queue, GroupManager manager)
+ {
+ _subject = subject;
+ _queue = queue;
+ _manager = manager;
+ _manager.addMemberhipChangeListener(this);
+ }
+
+ public void changed(List<MemberHandle> members)
+ {
+ if(!members.contains(_subject))
+ {
+ _queue.removeAllRemoteSubscriber(_subject);
+ _manager.removeMemberhipChangeListener(this);
+ _logger.info(new LogMessage("Removed {0} from {1}", _subject, _queue));
+ }
+ }
+}