summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-11-23 22:58:36 +0000
committerRobert Greig <rgreig@apache.org>2006-11-23 22:58:36 +0000
commit31ba3aa4ab1187cb74635271e03177c486c9fab8 (patch)
tree39e7a8f581002cb6c702748bf03f1dea418bedda
parent2342078cd8586a567e997789b74f7c8bafc2bca2 (diff)
downloadqpid-python-31ba3aa4ab1187cb74635271e03177c486c9fab8.tar.gz
Start of merge from trunk - some manual restructuring
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@478702 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/Broker.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/JoinState.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/Main.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/Member.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/Sendable.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java (renamed from java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java (renamed from java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java (renamed from java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java (renamed from java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java (renamed from java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java (renamed from java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java (renamed from java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (renamed from java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java)0
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java (renamed from java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java)0
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerGroupTest.java267
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java231
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java42
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/InductionBufferTest.java103
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBroker.java50
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java26
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleClusterTest.java41
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java59
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBroker.java67
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBrokerFactory.java26
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestReplayManager.java44
-rw-r--r--java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestSession.java266
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java267
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java231
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java42
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java103
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java50
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java26
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java41
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java59
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java67
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java26
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java44
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java266
97 files changed, 2444 insertions, 0 deletions
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java
index 3e7a2af01f..3e7a2af01f 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java
index 6d56afe50b..6d56afe50b 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Broker.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java
index d8a3fd1b76..d8a3fd1b76 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/Broker.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java
index 75a47a168f..75a47a168f 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java
index db667879f6..db667879f6 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
index 6909b199ce..6909b199ce 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
index d378647698..d378647698 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
index eb2992a690..eb2992a690 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
index 9bf2e02d3c..9bf2e02d3c 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index 65b188484c..65b188484c 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
index e5efe941b3..e5efe941b3 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
index 650240fa70..650240fa70 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index 980b36cf21..980b36cf21 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java
index d3b33f6fe3..d3b33f6fe3 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java
index 6c45c6e655..6c45c6e655 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java
index c12d06a337..c12d06a337 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java
index 76e15b88ec..76e15b88ec 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java
index af618631e4..af618631e4 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java
index d95229505f..d95229505f 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Main.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
index 94ec2042a2..94ec2042a2 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/Main.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Member.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java
index d7825aac6e..d7825aac6e 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/Member.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java
index 4b56eaf962..4b56eaf962 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
index 75f4d19103..75f4d19103 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java
index a62a413bc6..a62a413bc6 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
index cd3e00012e..cd3e00012e 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java
index f4a721dbc5..f4a721dbc5 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
index 4870f9daaa..4870f9daaa 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index f193862e78..f193862e78 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
index 9cf9beeba7..9cf9beeba7 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java
index 3d751fabba..3d751fabba 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java
index ad50a5a737..ad50a5a737 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
index 2b4408b66c..2b4408b66c 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
index f0b1db25e6..f0b1db25e6 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
index 4b75e76d97..4b75e76d97 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
index 5ee07af596..5ee07af596 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
index cd25bd178a..cd25bd178a 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
index 9e4444819e..9e4444819e 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
index a1684b399f..a1684b399f 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
index 08dcfbe121..08dcfbe121 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java
index 27eff59685..27eff59685 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
index 997c055d77..997c055d77 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
index 7fae2c6598..7fae2c6598 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
index a2a570f045..a2a570f045 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
index 2f15373eba..2f15373eba 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
index 9a598d7f07..9a598d7f07 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
index 24ce4087fb..24ce4087fb 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
index 4a895fcd0a..4a895fcd0a 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
index a5fab27d16..a5fab27d16 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
index 18c3f5ce58..18c3f5ce58 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
index 912651d3ce..912651d3ce 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
index 044d264380..044d264380 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
index cd004c36d0..cd004c36d0 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
index 0aca6fbe97..0aca6fbe97 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java
index fbee483967..fbee483967 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
index 1c6023d35d..1c6023d35d 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
index cbbc8679a8..cbbc8679a8 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
index 7ba51108f5..7ba51108f5 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java
index 8d5ce4f18e..8d5ce4f18e 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
index 638ec64e09..638ec64e09 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java
index 1f555b0f91..1f555b0f91 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index 66bd8e0b0c..66bd8e0b0c 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java
index cca3953f34..cca3953f34 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java
index 5bdc824060..5bdc824060 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java
index 9824041358..9824041358 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java
index 1c4e3da6f3..1c4e3da6f3 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java
diff --git a/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index 14d893b040..14d893b040 100644
--- a/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
diff --git a/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
index 0005b20fb1..0005b20fb1 100644
--- a/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
diff --git a/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
index 0bb6537930..0bb6537930 100644
--- a/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
diff --git a/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index 1e7e13a577..1e7e13a577 100644
--- a/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
diff --git a/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
index dac8b616e5..dac8b616e5 100644
--- a/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
diff --git a/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
index a9a467b306..a9a467b306 100644
--- a/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
diff --git a/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 9de7a5c849..9de7a5c849 100644
--- a/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
diff --git a/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java
index cdb6f8f4d2..cdb6f8f4d2 100644
--- a/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerGroupTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerGroupTest.java
new file mode 100644
index 0000000000..015e96f9c6
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerGroupTest.java
@@ -0,0 +1,267 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class BrokerGroupTest extends TestCase
+{
+ private final MemberHandle a = new SimpleMemberHandle("A", 1);
+ private final MemberHandle b = new SimpleMemberHandle("B", 1);
+ private final MemberHandle c = new SimpleMemberHandle("C", 1);
+ private final MemberHandle d = new SimpleMemberHandle("D", 1);
+
+ //join (new members perspective)
+ // (i) connectToLeader()
+ // ==> check state
+ // (ii) setMembers()
+ // ==> check state
+ // ==> check members
+ // (iii) synched(leader)
+ // ==> check state
+ // ==> check peers
+ // (iv) synched(other)
+ // ==> check state
+ // ==> check peers
+ // repeat for all others
+ public void testJoin_newMember() throws Exception
+ {
+ MemberHandle[] pre = new MemberHandle[]{a, b, c};
+ MemberHandle[] post = new MemberHandle[]{a, b, c};
+
+ BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory());
+ assertEquals(JoinState.UNINITIALISED, group.getState());
+ //(i)
+ group.connectToLeader(a);
+ assertEquals(JoinState.JOINING, group.getState());
+ assertEquals("Wrong number of peers", 1, group.getPeers().size());
+ //(ii)
+ group.setMembers(Arrays.asList(post));
+ assertEquals(JoinState.INITIATION, group.getState());
+ assertEquals(Arrays.asList(post), group.getMembers());
+ //(iii) & (iv)
+ for (MemberHandle member : pre)
+ {
+ group.synched(member);
+ if (member == c)
+ {
+ assertEquals(JoinState.JOINED, group.getState());
+ assertEquals("Wrong number of peers", pre.length, group.getPeers().size());
+ }
+ else
+ {
+ assertEquals(JoinState.INDUCTION, group.getState());
+ assertEquals("Wrong number of peers", 1, group.getPeers().size());
+ }
+ }
+ }
+
+ //join (leaders perspective)
+ // (i) extablish()
+ // ==> check state
+ // ==> check members
+ // ==> check peers
+ // (ii) connectToProspect()
+ // ==> check members
+ // ==> check peers
+ // repeat (ii)
+ public void testJoin_Leader() throws IOException, InterruptedException
+ {
+ MemberHandle[] prospects = new MemberHandle[]{b, c, d};
+
+ BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory());
+ assertEquals(JoinState.UNINITIALISED, group.getState());
+ //(i)
+ group.establish();
+ assertEquals(JoinState.JOINED, group.getState());
+ assertEquals("Wrong number of peers", 0, group.getPeers().size());
+ assertEquals("Wrong number of members", 1, group.getMembers().size());
+ assertEquals(a, group.getMembers().get(0));
+ //(ii)
+ for (int i = 0; i < prospects.length; i++)
+ {
+ group.connectToProspect(prospects[i]);
+ assertEquals("Wrong number of peers", i + 1, group.getPeers().size());
+ for (int j = 0; j <= i; j++)
+ {
+ assertTrue(prospects[i].matches(group.getPeers().get(i)));
+ }
+ assertEquals("Wrong number of members", i + 2, group.getMembers().size());
+ assertEquals(a, group.getMembers().get(0));
+ for (int j = 0; j <= i; j++)
+ {
+ assertEquals(prospects[i], group.getMembers().get(i + 1));
+ }
+ }
+ }
+
+ //join (general perspective)
+ // (i) set up group
+ // (ii) setMembers()
+ // ==> check members
+ // ==> check peers
+ public void testJoin_general() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] peers = new MemberHandle[]{a, b, d};
+
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ //(ii)
+ group.setMembers(Arrays.asList(view2));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(peers.length, group.getPeers().size());
+ for (int i = 0; i < peers.length; i++)
+ {
+ assertTrue(peers[i].matches(group.getPeers().get(i)));
+ }
+ }
+
+ //leadership transfer (valid)
+ // (i) set up group
+ // (ii) assumeLeadership()
+ // ==> check return value
+ // ==> check members
+ // ==> check peers
+ // ==> check isLeader()
+ // ==> check isLeader(old_leader)
+ // ==> check isMember(old_leader)
+ public void testTransferLeadership_valid() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view3 = new MemberHandle[]{b, c, d};
+
+ BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ group.setMembers(Arrays.asList(view2));
+ //(ii)
+ boolean result = group.assumeLeadership();
+ assertTrue(result);
+ assertTrue(group.isLeader());
+ assertFalse(group.isLeader(a));
+ assertEquals(Arrays.asList(view3), group.getMembers());
+ assertEquals(2, group.getPeers().size());
+ assertTrue(c.matches(group.getPeers().get(0)));
+ assertTrue(d.matches(group.getPeers().get(1)));
+ }
+
+ //leadership transfer (invalid)
+ // (i) set up group
+ // (ii) assumeLeadership()
+ // ==> check return value
+ // ==> check members
+ // ==> check peers
+ // ==> check isLeader()
+ // ==> check isLeader(old_leader)
+ // ==> check isMember(old_leader)
+ public void testTransferLeadership_invalid() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ group.setMembers(Arrays.asList(view2));
+ //(ii)
+ boolean result = group.assumeLeadership();
+ assertFalse(result);
+ assertFalse(group.isLeader());
+ assertTrue(group.isLeader(a));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(3, group.getPeers().size());
+ assertTrue(a.matches(group.getPeers().get(0)));
+ assertTrue(b.matches(group.getPeers().get(1)));
+ assertTrue(d.matches(group.getPeers().get(2)));
+
+ }
+
+ //leave (leaders perspective)
+ // (i) set up group
+ // (ii) remove a member
+ // ==> check members
+ // ==> check peers
+ // ==> check isMember(removed_member)
+ // repeat (ii)
+ public void testLeave_leader()
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, d};
+ MemberHandle[] view3 = new MemberHandle[]{a, d};
+ MemberHandle[] view4 = new MemberHandle[]{a};
+ //(i)
+ BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory());
+ group.establish();
+ group.setMembers(Arrays.asList(view1));
+ //(ii)
+ group.remove(group.findBroker(c, false));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+
+ group.remove(group.findBroker(b, false));
+ assertEquals(Arrays.asList(view3), group.getMembers());
+
+ group.remove(group.findBroker(d, false));
+ assertEquals(Arrays.asList(view4), group.getMembers());
+ }
+
+
+ //leave (general perspective)
+ // (i) set up group
+ // (ii) setMember
+ // ==> check members
+ // ==> check peers
+ // ==> check isMember(removed_member)
+ // repeat (ii)
+ public void testLeave_general()
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view2 = new MemberHandle[]{a, c, d};
+ //(i)
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ group.establish(); //not strictly the correct way to build up the group, but ok for here
+ group.setMembers(Arrays.asList(view1));
+ //(ii)
+ group.setMembers(Arrays.asList(view2));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(2, group.getPeers().size());
+ assertTrue(a.matches(group.getPeers().get(0)));
+ assertTrue(d.matches(group.getPeers().get(1)));
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java
new file mode 100644
index 0000000000..d7ede7d3e0
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java
@@ -0,0 +1,231 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.policy.StandardPolicies;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class BrokerTest extends TestCase
+{
+ //group request (no failure)
+ public void testGroupRequest_noFailure() throws AMQException
+ {
+ RecordingBroker[] brokers = new RecordingBroker[]{
+ new RecordingBroker("A", 1),
+ new RecordingBroker("B", 2),
+ new RecordingBroker("C", 3)
+ };
+ GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers)));
+ GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+ for (Broker b : brokers)
+ {
+ b.invoke(grpRequest);
+ }
+ grpRequest.finishedSend();
+
+ for (RecordingBroker b : brokers)
+ {
+ b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response"));
+ }
+
+ assertTrue("Handler did not receive response", handler.isCompleted());
+ }
+
+ //group request (failure)
+ public void testGroupRequest_failure() throws AMQException
+ {
+ RecordingBroker a = new RecordingBroker("A", 1);
+ RecordingBroker b = new RecordingBroker("B", 2);
+ RecordingBroker c = new RecordingBroker("C", 3);
+ RecordingBroker[] all = new RecordingBroker[]{a, b, c};
+ RecordingBroker[] succeeded = new RecordingBroker[]{a, c};
+
+ GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded)));
+ GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+
+ for (Broker broker : all)
+ {
+ broker.invoke(grpRequest);
+ }
+ grpRequest.finishedSend();
+
+ for (RecordingBroker broker : succeeded)
+ {
+ broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response"));
+ }
+ b.remove();
+
+ assertTrue("Handler did not receive response", handler.isCompleted());
+ }
+
+
+ //simple send (no response)
+ public void testSend_noResponse() throws AMQException
+ {
+ AMQBody[] msgs = new AMQBody[]{
+ new TestMethod("A"),
+ new TestMethod("B"),
+ new TestMethod("C")
+ };
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ for (AMQBody msg : msgs)
+ {
+ broker.send(new SimpleSendable(msg), null);
+ }
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(msgs.length, sent.size());
+ for (int i = 0; i < msgs.length; i++)
+ {
+ assertTrue(sent.get(i) instanceof AMQFrame);
+ assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+ }
+ }
+
+ //simple send (no failure)
+ public void testSend_noFailure() throws AMQException
+ {
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ BlockingHandler handler = new BlockingHandler();
+ broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(1, sent.size());
+ assertTrue(sent.get(0) instanceof AMQFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+
+ broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B"));
+
+ assertEquals(new TestMethod("B"), handler.getResponse());
+ }
+
+ //simple send (failure)
+ public void testSend_failure() throws AMQException
+ {
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ BlockingHandler handler = new BlockingHandler();
+ broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(1, sent.size());
+ assertTrue(sent.get(0) instanceof AMQFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ broker.remove();
+ assertEquals(null, handler.getResponse());
+ assertTrue(handler.isCompleted());
+ assertTrue(handler.failed());
+ }
+
+ private static class TestMethod extends AMQMethodBody
+ {
+ private final Object id;
+
+ TestMethod(Object id)
+ {
+ this.id = id;
+ }
+
+ protected int getBodySize()
+ {
+ return 0;
+ }
+
+ protected int getClazz()
+ {
+ return 1002;
+ }
+
+ protected int getMethod()
+ {
+ return 1003;
+ }
+
+ protected void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ protected byte getType()
+ {
+ return 0;
+ }
+
+ protected int getSize()
+ {
+ return 0;
+ }
+
+ protected void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof TestMethod && id.equals(((TestMethod) o).id);
+ }
+
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+
+ }
+
+ private static class GroupResponseValidator implements GroupResponseHandler
+ {
+ private final AMQMethodBody _response;
+ private final List<Member> _members;
+ private boolean _completed = false;
+
+ GroupResponseValidator(AMQMethodBody response, List<Member> members)
+ {
+ _response = response;
+ _members = members;
+ }
+
+ public void response(List<AMQMethodBody> responses, List<Member> members)
+ {
+ for (AMQMethodBody r : responses)
+ {
+ assertEquals(_response, r);
+ }
+ assertEquals(_members, members);
+ _completed = true;
+ }
+
+ boolean isCompleted()
+ {
+ return _completed;
+ }
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
new file mode 100644
index 0000000000..132ebd8ca0
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class ClusterCapabilityTest
+{
+ @Test
+ public void startWithNull()
+ {
+ MemberHandle peer = new SimpleMemberHandle("myhost:9999");
+ String c = ClusterCapability.add(null, peer);
+ assertTrue(ClusterCapability.contains(c));
+ assertTrue(peer.matches(ClusterCapability.getPeer(c)));
+ }
+
+ @Test
+ public void startWithText()
+ {
+ MemberHandle peer = new SimpleMemberHandle("myhost:9999");
+ String c = ClusterCapability.add("existing text", peer);
+ assertTrue(ClusterCapability.contains(c));
+ assertTrue(peer.matches(ClusterCapability.getPeer(c)));
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/InductionBufferTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/InductionBufferTest.java
new file mode 100644
index 0000000000..fedf47d49a
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/InductionBufferTest.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.IoSession;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+public class InductionBufferTest extends TestCase
+{
+ public void test() throws Exception
+ {
+ IoSession session1 = new TestSession();
+ IoSession session2 = new TestSession();
+ IoSession session3 = new TestSession();
+
+ TestMessageHandler handler = new TestMessageHandler();
+ InductionBuffer buffer = new InductionBuffer(handler);
+
+ buffer.receive(session1, "one");
+ buffer.receive(session2, "two");
+ buffer.receive(session3, "three");
+
+ buffer.receive(session1, "four");
+ buffer.receive(session1, "five");
+ buffer.receive(session1, "six");
+
+ buffer.receive(session3, "seven");
+ buffer.receive(session3, "eight");
+
+ handler.checkEmpty();
+ buffer.deliver();
+
+ handler.check(session1, "one");
+ handler.check(session2, "two");
+ handler.check(session3, "three");
+
+ handler.check(session1, "four");
+ handler.check(session1, "five");
+ handler.check(session1, "six");
+
+ handler.check(session3, "seven");
+ handler.check(session3, "eight");
+ handler.checkEmpty();
+
+ buffer.receive(session1, "nine");
+ buffer.receive(session2, "ten");
+ buffer.receive(session3, "eleven");
+
+ handler.check(session1, "nine");
+ handler.check(session2, "ten");
+ handler.check(session3, "eleven");
+
+ handler.checkEmpty();
+ }
+
+ private static class TestMessageHandler implements InductionBuffer.MessageHandler
+ {
+ private final List<IoSession> _sessions = new ArrayList<IoSession>();
+ private final List<Object> _msgs = new ArrayList<Object>();
+
+ public synchronized void deliver(IoSession session, Object msg) throws Exception
+ {
+ _sessions.add(session);
+ _msgs.add(msg);
+ }
+
+ void check(IoSession actualSession, Object actualMsg)
+ {
+ assertFalse(_sessions.isEmpty());
+ assertFalse(_msgs.isEmpty());
+ IoSession expectedSession = _sessions.remove(0);
+ Object expectedMsg = _msgs.remove(0);
+ assertEquals(expectedSession, actualSession);
+ assertEquals(expectedMsg, actualMsg);
+ }
+
+ void checkEmpty()
+ {
+ assertTrue(_sessions.isEmpty());
+ assertTrue(_msgs.isEmpty());
+ }
+ }
+}
+
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBroker.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBroker.java
new file mode 100644
index 0000000000..388d584288
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBroker.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class RecordingBroker extends TestBroker
+{
+ private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>();
+
+ RecordingBroker(String host, int port)
+ {
+ super(host, port);
+ }
+
+ public void send(AMQDataBlock data) throws AMQException
+ {
+ _messages.add(data);
+ }
+
+ List<AMQDataBlock> getMessages()
+ {
+ return _messages;
+ }
+
+ void clear()
+ {
+ _messages.clear();
+ }
+
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java
new file mode 100644
index 0000000000..e5e95323af
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+class RecordingBrokerFactory implements BrokerFactory
+{
+ public Broker create(MemberHandle handle)
+ {
+ return new RecordingBroker(handle.getHost(), handle.getPort());
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
new file mode 100644
index 0000000000..a4d13ea46d
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.junit.Test;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.JMSException;
+
+public class SimpleClusterTest
+{
+ @Test
+ public void declareExchange() throws AMQException, JMSException, URLSyntaxException
+ {
+ AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test");
+ AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ System.out.println("Session created");
+ session.declareExchange("my_exchange", "direct");
+ System.out.println("Exchange declared");
+ con.close();
+ System.out.println("Connection closed");
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java
new file mode 100644
index 0000000000..f7c728759b
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class SimpleMemberHandleTest
+{
+ @Test
+ public void matches()
+ {
+ assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888));
+ assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888));
+ assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888));
+ }
+
+
+ @Test
+ public void resolve()
+ {
+ assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000"));
+ }
+
+ private void assertEquivalent(MemberHandle a, MemberHandle b)
+ {
+ String msg = a + " is not equivalent to " + b;
+ a = SimpleMemberHandle.resolve(a);
+ b = SimpleMemberHandle.resolve(b);
+ msg += "(" + a + " does not match " + b + ")";
+ assertTrue(msg, a.matches(b));
+ }
+
+ private void assertMatch(MemberHandle a, MemberHandle b)
+ {
+ assertTrue(a + " does not match " + b, a.matches(b));
+ }
+
+ private void assertNoMatch(MemberHandle a, MemberHandle b)
+ {
+ assertFalse(a + " matches " + b, a.matches(b));
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBroker.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBroker.java
new file mode 100644
index 0000000000..c4a1985ae3
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBroker.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+
+import java.io.IOException;
+
+class TestBroker extends Broker
+{
+ TestBroker(String host, int port)
+ {
+ super(host, port);
+ }
+
+ boolean connect() throws IOException, InterruptedException
+ {
+ return true;
+ }
+
+ void connectAsynch(Iterable<AMQMethodBody> msgs)
+ {
+ replay(msgs);
+ }
+
+ void replay(Iterable<AMQMethodBody> msgs)
+ {
+ try
+ {
+ for (AMQMethodBody b : msgs)
+ {
+ send(new AMQFrame(0, b));
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Broker connectToCluster() throws IOException, InterruptedException
+ {
+ return this;
+ }
+
+ public void send(AMQDataBlock data) throws AMQException
+ {
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBrokerFactory.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBrokerFactory.java
new file mode 100644
index 0000000000..cd4e340925
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBrokerFactory.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+class TestBrokerFactory implements BrokerFactory
+{
+ public Broker create(MemberHandle handle)
+ {
+ return new TestBroker(handle.getHost(), handle.getPort());
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestReplayManager.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestReplayManager.java
new file mode 100644
index 0000000000..e2d6f75f19
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestReplayManager.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.replay.ReplayManager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class TestReplayManager implements ReplayManager
+{
+ private final List<AMQMethodBody> _msgs;
+
+ TestReplayManager()
+ {
+ this(new ArrayList<AMQMethodBody>());
+ }
+
+ TestReplayManager(List<AMQMethodBody> msgs)
+ {
+ _msgs = msgs;
+ }
+
+ public List<AMQMethodBody> replay(boolean isLeader)
+ {
+ return _msgs;
+ }
+}
diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestSession.java
new file mode 100644
index 0000000000..675e20c9dc
--- /dev/null
+++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestSession.java
@@ -0,0 +1,266 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.*;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+class TestSession implements IoSession
+{
+ public IoService getService()
+ {
+ return null; //TODO
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return null; //TODO
+ }
+
+ public IoHandler getHandler()
+ {
+ return null; //TODO
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null; //TODO
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null; //TODO
+ }
+
+ public WriteFuture write(Object message)
+ {
+ return null; //TODO
+ }
+
+ public CloseFuture close()
+ {
+ return null; //TODO
+ }
+
+ public Object getAttachment()
+ {
+ return null; //TODO
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ return null; //TODO
+ }
+
+ public Object getAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return null; //TODO
+ }
+
+ public Object setAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return false; //TODO
+ }
+
+ public Set getAttributeKeys()
+ {
+ return null; //TODO
+ }
+
+ public TransportType getTransportType()
+ {
+ return null; //TODO
+ }
+
+ public boolean isConnected()
+ {
+ return false; //TODO
+ }
+
+ public boolean isClosing()
+ {
+ return false; //TODO
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null; //TODO
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+ //TODO
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0; //TODO
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0; //TODO
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+ //TODO
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null; //TODO
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+ //TODO
+ }
+
+ public void suspendRead()
+ {
+ //TODO
+ }
+
+ public void suspendWrite()
+ {
+ //TODO
+ }
+
+ public void resumeRead()
+ {
+ //TODO
+ }
+
+ public void resumeWrite()
+ {
+ //TODO
+ }
+
+ public long getReadBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getReadMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0; //TODO
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0; //TODO
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getCreationTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastIoTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastReadTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0; //TODO
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false; //TODO
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java
new file mode 100644
index 0000000000..015e96f9c6
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java
@@ -0,0 +1,267 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class BrokerGroupTest extends TestCase
+{
+ private final MemberHandle a = new SimpleMemberHandle("A", 1);
+ private final MemberHandle b = new SimpleMemberHandle("B", 1);
+ private final MemberHandle c = new SimpleMemberHandle("C", 1);
+ private final MemberHandle d = new SimpleMemberHandle("D", 1);
+
+ //join (new members perspective)
+ // (i) connectToLeader()
+ // ==> check state
+ // (ii) setMembers()
+ // ==> check state
+ // ==> check members
+ // (iii) synched(leader)
+ // ==> check state
+ // ==> check peers
+ // (iv) synched(other)
+ // ==> check state
+ // ==> check peers
+ // repeat for all others
+ public void testJoin_newMember() throws Exception
+ {
+ MemberHandle[] pre = new MemberHandle[]{a, b, c};
+ MemberHandle[] post = new MemberHandle[]{a, b, c};
+
+ BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory());
+ assertEquals(JoinState.UNINITIALISED, group.getState());
+ //(i)
+ group.connectToLeader(a);
+ assertEquals(JoinState.JOINING, group.getState());
+ assertEquals("Wrong number of peers", 1, group.getPeers().size());
+ //(ii)
+ group.setMembers(Arrays.asList(post));
+ assertEquals(JoinState.INITIATION, group.getState());
+ assertEquals(Arrays.asList(post), group.getMembers());
+ //(iii) & (iv)
+ for (MemberHandle member : pre)
+ {
+ group.synched(member);
+ if (member == c)
+ {
+ assertEquals(JoinState.JOINED, group.getState());
+ assertEquals("Wrong number of peers", pre.length, group.getPeers().size());
+ }
+ else
+ {
+ assertEquals(JoinState.INDUCTION, group.getState());
+ assertEquals("Wrong number of peers", 1, group.getPeers().size());
+ }
+ }
+ }
+
+ //join (leaders perspective)
+ // (i) extablish()
+ // ==> check state
+ // ==> check members
+ // ==> check peers
+ // (ii) connectToProspect()
+ // ==> check members
+ // ==> check peers
+ // repeat (ii)
+ public void testJoin_Leader() throws IOException, InterruptedException
+ {
+ MemberHandle[] prospects = new MemberHandle[]{b, c, d};
+
+ BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory());
+ assertEquals(JoinState.UNINITIALISED, group.getState());
+ //(i)
+ group.establish();
+ assertEquals(JoinState.JOINED, group.getState());
+ assertEquals("Wrong number of peers", 0, group.getPeers().size());
+ assertEquals("Wrong number of members", 1, group.getMembers().size());
+ assertEquals(a, group.getMembers().get(0));
+ //(ii)
+ for (int i = 0; i < prospects.length; i++)
+ {
+ group.connectToProspect(prospects[i]);
+ assertEquals("Wrong number of peers", i + 1, group.getPeers().size());
+ for (int j = 0; j <= i; j++)
+ {
+ assertTrue(prospects[i].matches(group.getPeers().get(i)));
+ }
+ assertEquals("Wrong number of members", i + 2, group.getMembers().size());
+ assertEquals(a, group.getMembers().get(0));
+ for (int j = 0; j <= i; j++)
+ {
+ assertEquals(prospects[i], group.getMembers().get(i + 1));
+ }
+ }
+ }
+
+ //join (general perspective)
+ // (i) set up group
+ // (ii) setMembers()
+ // ==> check members
+ // ==> check peers
+ public void testJoin_general() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] peers = new MemberHandle[]{a, b, d};
+
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ //(ii)
+ group.setMembers(Arrays.asList(view2));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(peers.length, group.getPeers().size());
+ for (int i = 0; i < peers.length; i++)
+ {
+ assertTrue(peers[i].matches(group.getPeers().get(i)));
+ }
+ }
+
+ //leadership transfer (valid)
+ // (i) set up group
+ // (ii) assumeLeadership()
+ // ==> check return value
+ // ==> check members
+ // ==> check peers
+ // ==> check isLeader()
+ // ==> check isLeader(old_leader)
+ // ==> check isMember(old_leader)
+ public void testTransferLeadership_valid() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view3 = new MemberHandle[]{b, c, d};
+
+ BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ group.setMembers(Arrays.asList(view2));
+ //(ii)
+ boolean result = group.assumeLeadership();
+ assertTrue(result);
+ assertTrue(group.isLeader());
+ assertFalse(group.isLeader(a));
+ assertEquals(Arrays.asList(view3), group.getMembers());
+ assertEquals(2, group.getPeers().size());
+ assertTrue(c.matches(group.getPeers().get(0)));
+ assertTrue(d.matches(group.getPeers().get(1)));
+ }
+
+ //leadership transfer (invalid)
+ // (i) set up group
+ // (ii) assumeLeadership()
+ // ==> check return value
+ // ==> check members
+ // ==> check peers
+ // ==> check isLeader()
+ // ==> check isLeader(old_leader)
+ // ==> check isMember(old_leader)
+ public void testTransferLeadership_invalid() throws Exception
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
+
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ //(i)
+ group.connectToLeader(a);
+ group.setMembers(Arrays.asList(view1));
+ for (MemberHandle h : view1)
+ {
+ group.synched(h);
+ }
+ group.setMembers(Arrays.asList(view2));
+ //(ii)
+ boolean result = group.assumeLeadership();
+ assertFalse(result);
+ assertFalse(group.isLeader());
+ assertTrue(group.isLeader(a));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(3, group.getPeers().size());
+ assertTrue(a.matches(group.getPeers().get(0)));
+ assertTrue(b.matches(group.getPeers().get(1)));
+ assertTrue(d.matches(group.getPeers().get(2)));
+
+ }
+
+ //leave (leaders perspective)
+ // (i) set up group
+ // (ii) remove a member
+ // ==> check members
+ // ==> check peers
+ // ==> check isMember(removed_member)
+ // repeat (ii)
+ public void testLeave_leader()
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view2 = new MemberHandle[]{a, b, d};
+ MemberHandle[] view3 = new MemberHandle[]{a, d};
+ MemberHandle[] view4 = new MemberHandle[]{a};
+ //(i)
+ BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory());
+ group.establish();
+ group.setMembers(Arrays.asList(view1));
+ //(ii)
+ group.remove(group.findBroker(c, false));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+
+ group.remove(group.findBroker(b, false));
+ assertEquals(Arrays.asList(view3), group.getMembers());
+
+ group.remove(group.findBroker(d, false));
+ assertEquals(Arrays.asList(view4), group.getMembers());
+ }
+
+
+ //leave (general perspective)
+ // (i) set up group
+ // (ii) setMember
+ // ==> check members
+ // ==> check peers
+ // ==> check isMember(removed_member)
+ // repeat (ii)
+ public void testLeave_general()
+ {
+ MemberHandle[] view1 = new MemberHandle[]{a, b, c, d};
+ MemberHandle[] view2 = new MemberHandle[]{a, c, d};
+ //(i)
+ BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
+ group.establish(); //not strictly the correct way to build up the group, but ok for here
+ group.setMembers(Arrays.asList(view1));
+ //(ii)
+ group.setMembers(Arrays.asList(view2));
+ assertEquals(Arrays.asList(view2), group.getMembers());
+ assertEquals(2, group.getPeers().size());
+ assertTrue(a.matches(group.getPeers().get(0)));
+ assertTrue(d.matches(group.getPeers().get(1)));
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
new file mode 100644
index 0000000000..d7ede7d3e0
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
@@ -0,0 +1,231 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.policy.StandardPolicies;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class BrokerTest extends TestCase
+{
+ //group request (no failure)
+ public void testGroupRequest_noFailure() throws AMQException
+ {
+ RecordingBroker[] brokers = new RecordingBroker[]{
+ new RecordingBroker("A", 1),
+ new RecordingBroker("B", 2),
+ new RecordingBroker("C", 3)
+ };
+ GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers)));
+ GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+ for (Broker b : brokers)
+ {
+ b.invoke(grpRequest);
+ }
+ grpRequest.finishedSend();
+
+ for (RecordingBroker b : brokers)
+ {
+ b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response"));
+ }
+
+ assertTrue("Handler did not receive response", handler.isCompleted());
+ }
+
+ //group request (failure)
+ public void testGroupRequest_failure() throws AMQException
+ {
+ RecordingBroker a = new RecordingBroker("A", 1);
+ RecordingBroker b = new RecordingBroker("B", 2);
+ RecordingBroker c = new RecordingBroker("C", 3);
+ RecordingBroker[] all = new RecordingBroker[]{a, b, c};
+ RecordingBroker[] succeeded = new RecordingBroker[]{a, c};
+
+ GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded)));
+ GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+
+ for (Broker broker : all)
+ {
+ broker.invoke(grpRequest);
+ }
+ grpRequest.finishedSend();
+
+ for (RecordingBroker broker : succeeded)
+ {
+ broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response"));
+ }
+ b.remove();
+
+ assertTrue("Handler did not receive response", handler.isCompleted());
+ }
+
+
+ //simple send (no response)
+ public void testSend_noResponse() throws AMQException
+ {
+ AMQBody[] msgs = new AMQBody[]{
+ new TestMethod("A"),
+ new TestMethod("B"),
+ new TestMethod("C")
+ };
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ for (AMQBody msg : msgs)
+ {
+ broker.send(new SimpleSendable(msg), null);
+ }
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(msgs.length, sent.size());
+ for (int i = 0; i < msgs.length; i++)
+ {
+ assertTrue(sent.get(i) instanceof AMQFrame);
+ assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+ }
+ }
+
+ //simple send (no failure)
+ public void testSend_noFailure() throws AMQException
+ {
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ BlockingHandler handler = new BlockingHandler();
+ broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(1, sent.size());
+ assertTrue(sent.get(0) instanceof AMQFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+
+ broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B"));
+
+ assertEquals(new TestMethod("B"), handler.getResponse());
+ }
+
+ //simple send (failure)
+ public void testSend_failure() throws AMQException
+ {
+ RecordingBroker broker = new RecordingBroker("myhost", 1);
+ BlockingHandler handler = new BlockingHandler();
+ broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ List<AMQDataBlock> sent = broker.getMessages();
+ assertEquals(1, sent.size());
+ assertTrue(sent.get(0) instanceof AMQFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ broker.remove();
+ assertEquals(null, handler.getResponse());
+ assertTrue(handler.isCompleted());
+ assertTrue(handler.failed());
+ }
+
+ private static class TestMethod extends AMQMethodBody
+ {
+ private final Object id;
+
+ TestMethod(Object id)
+ {
+ this.id = id;
+ }
+
+ protected int getBodySize()
+ {
+ return 0;
+ }
+
+ protected int getClazz()
+ {
+ return 1002;
+ }
+
+ protected int getMethod()
+ {
+ return 1003;
+ }
+
+ protected void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ protected byte getType()
+ {
+ return 0;
+ }
+
+ protected int getSize()
+ {
+ return 0;
+ }
+
+ protected void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof TestMethod && id.equals(((TestMethod) o).id);
+ }
+
+ public int hashCode()
+ {
+ return id.hashCode();
+ }
+
+ }
+
+ private static class GroupResponseValidator implements GroupResponseHandler
+ {
+ private final AMQMethodBody _response;
+ private final List<Member> _members;
+ private boolean _completed = false;
+
+ GroupResponseValidator(AMQMethodBody response, List<Member> members)
+ {
+ _response = response;
+ _members = members;
+ }
+
+ public void response(List<AMQMethodBody> responses, List<Member> members)
+ {
+ for (AMQMethodBody r : responses)
+ {
+ assertEquals(_response, r);
+ }
+ assertEquals(_members, members);
+ _completed = true;
+ }
+
+ boolean isCompleted()
+ {
+ return _completed;
+ }
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
new file mode 100644
index 0000000000..132ebd8ca0
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class ClusterCapabilityTest
+{
+ @Test
+ public void startWithNull()
+ {
+ MemberHandle peer = new SimpleMemberHandle("myhost:9999");
+ String c = ClusterCapability.add(null, peer);
+ assertTrue(ClusterCapability.contains(c));
+ assertTrue(peer.matches(ClusterCapability.getPeer(c)));
+ }
+
+ @Test
+ public void startWithText()
+ {
+ MemberHandle peer = new SimpleMemberHandle("myhost:9999");
+ String c = ClusterCapability.add("existing text", peer);
+ assertTrue(ClusterCapability.contains(c));
+ assertTrue(peer.matches(ClusterCapability.getPeer(c)));
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java
new file mode 100644
index 0000000000..fedf47d49a
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.IoSession;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+public class InductionBufferTest extends TestCase
+{
+ public void test() throws Exception
+ {
+ IoSession session1 = new TestSession();
+ IoSession session2 = new TestSession();
+ IoSession session3 = new TestSession();
+
+ TestMessageHandler handler = new TestMessageHandler();
+ InductionBuffer buffer = new InductionBuffer(handler);
+
+ buffer.receive(session1, "one");
+ buffer.receive(session2, "two");
+ buffer.receive(session3, "three");
+
+ buffer.receive(session1, "four");
+ buffer.receive(session1, "five");
+ buffer.receive(session1, "six");
+
+ buffer.receive(session3, "seven");
+ buffer.receive(session3, "eight");
+
+ handler.checkEmpty();
+ buffer.deliver();
+
+ handler.check(session1, "one");
+ handler.check(session2, "two");
+ handler.check(session3, "three");
+
+ handler.check(session1, "four");
+ handler.check(session1, "five");
+ handler.check(session1, "six");
+
+ handler.check(session3, "seven");
+ handler.check(session3, "eight");
+ handler.checkEmpty();
+
+ buffer.receive(session1, "nine");
+ buffer.receive(session2, "ten");
+ buffer.receive(session3, "eleven");
+
+ handler.check(session1, "nine");
+ handler.check(session2, "ten");
+ handler.check(session3, "eleven");
+
+ handler.checkEmpty();
+ }
+
+ private static class TestMessageHandler implements InductionBuffer.MessageHandler
+ {
+ private final List<IoSession> _sessions = new ArrayList<IoSession>();
+ private final List<Object> _msgs = new ArrayList<Object>();
+
+ public synchronized void deliver(IoSession session, Object msg) throws Exception
+ {
+ _sessions.add(session);
+ _msgs.add(msg);
+ }
+
+ void check(IoSession actualSession, Object actualMsg)
+ {
+ assertFalse(_sessions.isEmpty());
+ assertFalse(_msgs.isEmpty());
+ IoSession expectedSession = _sessions.remove(0);
+ Object expectedMsg = _msgs.remove(0);
+ assertEquals(expectedSession, actualSession);
+ assertEquals(expectedMsg, actualMsg);
+ }
+
+ void checkEmpty()
+ {
+ assertTrue(_sessions.isEmpty());
+ assertTrue(_msgs.isEmpty());
+ }
+ }
+}
+
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java
new file mode 100644
index 0000000000..388d584288
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class RecordingBroker extends TestBroker
+{
+ private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>();
+
+ RecordingBroker(String host, int port)
+ {
+ super(host, port);
+ }
+
+ public void send(AMQDataBlock data) throws AMQException
+ {
+ _messages.add(data);
+ }
+
+ List<AMQDataBlock> getMessages()
+ {
+ return _messages;
+ }
+
+ void clear()
+ {
+ _messages.clear();
+ }
+
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java
new file mode 100644
index 0000000000..e5e95323af
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+class RecordingBrokerFactory implements BrokerFactory
+{
+ public Broker create(MemberHandle handle)
+ {
+ return new RecordingBroker(handle.getHost(), handle.getPort());
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
new file mode 100644
index 0000000000..a4d13ea46d
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.junit.Test;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.JMSException;
+
+public class SimpleClusterTest
+{
+ @Test
+ public void declareExchange() throws AMQException, JMSException, URLSyntaxException
+ {
+ AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test");
+ AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ System.out.println("Session created");
+ session.declareExchange("my_exchange", "direct");
+ System.out.println("Exchange declared");
+ con.close();
+ System.out.println("Connection closed");
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java
new file mode 100644
index 0000000000..f7c728759b
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class SimpleMemberHandleTest
+{
+ @Test
+ public void matches()
+ {
+ assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888));
+ assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888));
+ assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888));
+ }
+
+
+ @Test
+ public void resolve()
+ {
+ assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000"));
+ }
+
+ private void assertEquivalent(MemberHandle a, MemberHandle b)
+ {
+ String msg = a + " is not equivalent to " + b;
+ a = SimpleMemberHandle.resolve(a);
+ b = SimpleMemberHandle.resolve(b);
+ msg += "(" + a + " does not match " + b + ")";
+ assertTrue(msg, a.matches(b));
+ }
+
+ private void assertMatch(MemberHandle a, MemberHandle b)
+ {
+ assertTrue(a + " does not match " + b, a.matches(b));
+ }
+
+ private void assertNoMatch(MemberHandle a, MemberHandle b)
+ {
+ assertFalse(a + " matches " + b, a.matches(b));
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java
new file mode 100644
index 0000000000..c4a1985ae3
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+
+import java.io.IOException;
+
+class TestBroker extends Broker
+{
+ TestBroker(String host, int port)
+ {
+ super(host, port);
+ }
+
+ boolean connect() throws IOException, InterruptedException
+ {
+ return true;
+ }
+
+ void connectAsynch(Iterable<AMQMethodBody> msgs)
+ {
+ replay(msgs);
+ }
+
+ void replay(Iterable<AMQMethodBody> msgs)
+ {
+ try
+ {
+ for (AMQMethodBody b : msgs)
+ {
+ send(new AMQFrame(0, b));
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Broker connectToCluster() throws IOException, InterruptedException
+ {
+ return this;
+ }
+
+ public void send(AMQDataBlock data) throws AMQException
+ {
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java
new file mode 100644
index 0000000000..cd4e340925
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+class TestBrokerFactory implements BrokerFactory
+{
+ public Broker create(MemberHandle handle)
+ {
+ return new TestBroker(handle.getHost(), handle.getPort());
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java
new file mode 100644
index 0000000000..e2d6f75f19
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.cluster.replay.ReplayManager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class TestReplayManager implements ReplayManager
+{
+ private final List<AMQMethodBody> _msgs;
+
+ TestReplayManager()
+ {
+ this(new ArrayList<AMQMethodBody>());
+ }
+
+ TestReplayManager(List<AMQMethodBody> msgs)
+ {
+ _msgs = msgs;
+ }
+
+ public List<AMQMethodBody> replay(boolean isLeader)
+ {
+ return _msgs;
+ }
+}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java
new file mode 100644
index 0000000000..675e20c9dc
--- /dev/null
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java
@@ -0,0 +1,266 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.mina.common.*;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+class TestSession implements IoSession
+{
+ public IoService getService()
+ {
+ return null; //TODO
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return null; //TODO
+ }
+
+ public IoHandler getHandler()
+ {
+ return null; //TODO
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null; //TODO
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null; //TODO
+ }
+
+ public WriteFuture write(Object message)
+ {
+ return null; //TODO
+ }
+
+ public CloseFuture close()
+ {
+ return null; //TODO
+ }
+
+ public Object getAttachment()
+ {
+ return null; //TODO
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ return null; //TODO
+ }
+
+ public Object getAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return null; //TODO
+ }
+
+ public Object setAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return null; //TODO
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return false; //TODO
+ }
+
+ public Set getAttributeKeys()
+ {
+ return null; //TODO
+ }
+
+ public TransportType getTransportType()
+ {
+ return null; //TODO
+ }
+
+ public boolean isConnected()
+ {
+ return false; //TODO
+ }
+
+ public boolean isClosing()
+ {
+ return false; //TODO
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null; //TODO
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null; //TODO
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+ //TODO
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0; //TODO
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0; //TODO
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+ //TODO
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null; //TODO
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+ //TODO
+ }
+
+ public void suspendRead()
+ {
+ //TODO
+ }
+
+ public void suspendWrite()
+ {
+ //TODO
+ }
+
+ public void resumeRead()
+ {
+ //TODO
+ }
+
+ public void resumeWrite()
+ {
+ //TODO
+ }
+
+ public long getReadBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getReadMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0; //TODO
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0; //TODO
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getCreationTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastIoTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastReadTime()
+ {
+ return 0; //TODO
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0; //TODO
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false; //TODO
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0; //TODO
+ }
+}