diff options
author | Robert Greig <rgreig@apache.org> | 2006-11-23 22:58:36 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-11-23 22:58:36 +0000 |
commit | 31ba3aa4ab1187cb74635271e03177c486c9fab8 (patch) | |
tree | 39e7a8f581002cb6c702748bf03f1dea418bedda | |
parent | 2342078cd8586a567e997789b74f7c8bafc2bca2 (diff) | |
download | qpid-python-31ba3aa4ab1187cb74635271e03177c486c9fab8.tar.gz |
Start of merge from trunk - some manual restructuring
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@478702 13f79535-47bb-0310-9956-ffa450edef68
97 files changed, 2444 insertions, 0 deletions
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java index 3e7a2af01f..3e7a2af01f 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java index 6d56afe50b..6d56afe50b 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Broker.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java index d8a3fd1b76..d8a3fd1b76 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/Broker.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java index 75a47a168f..75a47a168f 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java index db667879f6..db667879f6 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java index 6909b199ce..6909b199ce 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java index d378647698..d378647698 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java index eb2992a690..eb2992a690 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java index 9bf2e02d3c..9bf2e02d3c 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index 65b188484c..65b188484c 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index e5efe941b3..e5efe941b3 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java index 650240fa70..650240fa70 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index 980b36cf21..980b36cf21 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java index d3b33f6fe3..d3b33f6fe3 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java index 6c45c6e655..6c45c6e655 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java index c12d06a337..c12d06a337 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java index 76e15b88ec..76e15b88ec 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java index af618631e4..af618631e4 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java index d95229505f..d95229505f 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Main.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java index 94ec2042a2..94ec2042a2 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/Main.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Member.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java index d7825aac6e..d7825aac6e 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/Member.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java index 4b56eaf962..4b56eaf962 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java index 75f4d19103..75f4d19103 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java index a62a413bc6..a62a413bc6 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java index cd3e00012e..cd3e00012e 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java index f4a721dbc5..f4a721dbc5 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java index 4870f9daaa..4870f9daaa 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index f193862e78..f193862e78 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java index 9cf9beeba7..9cf9beeba7 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java index 3d751fabba..3d751fabba 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java index ad50a5a737..ad50a5a737 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index 2b4408b66c..2b4408b66c 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java index f0b1db25e6..f0b1db25e6 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java index 4b75e76d97..4b75e76d97 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java index 5ee07af596..5ee07af596 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java index cd25bd178a..cd25bd178a 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java index 9e4444819e..9e4444819e 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java index a1684b399f..a1684b399f 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java index 08dcfbe121..08dcfbe121 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java index 27eff59685..27eff59685 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java index 997c055d77..997c055d77 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java index 7fae2c6598..7fae2c6598 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java index a2a570f045..a2a570f045 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java index 2f15373eba..2f15373eba 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java index 9a598d7f07..9a598d7f07 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index 24ce4087fb..24ce4087fb 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java index 4a895fcd0a..4a895fcd0a 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java index a5fab27d16..a5fab27d16 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java index 18c3f5ce58..18c3f5ce58 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java index 912651d3ce..912651d3ce 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java index 044d264380..044d264380 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java index cd004c36d0..cd004c36d0 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java index 0aca6fbe97..0aca6fbe97 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java index fbee483967..fbee483967 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java index 1c6023d35d..1c6023d35d 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java index cbbc8679a8..cbbc8679a8 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 7ba51108f5..7ba51108f5 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java index 8d5ce4f18e..8d5ce4f18e 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index 638ec64e09..638ec64e09 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java index 1f555b0f91..1f555b0f91 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index 66bd8e0b0c..66bd8e0b0c 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java index cca3953f34..cca3953f34 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java index 5bdc824060..5bdc824060 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java index 9824041358..9824041358 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java diff --git a/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java index 1c4e3da6f3..1c4e3da6f3 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java diff --git a/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 14d893b040..14d893b040 100644 --- a/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java diff --git a/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java index 0005b20fb1..0005b20fb1 100644 --- a/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java diff --git a/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java index 0bb6537930..0bb6537930 100644 --- a/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java diff --git a/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 1e7e13a577..1e7e13a577 100644 --- a/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java diff --git a/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java index dac8b616e5..dac8b616e5 100644 --- a/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java diff --git a/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java index a9a467b306..a9a467b306 100644 --- a/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java diff --git a/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index 9de7a5c849..9de7a5c849 100644 --- a/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java diff --git a/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java index cdb6f8f4d2..cdb6f8f4d2 100644 --- a/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerGroupTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerGroupTest.java new file mode 100644 index 0000000000..015e96f9c6 --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerGroupTest.java @@ -0,0 +1,267 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; + +import java.io.IOException; +import java.util.Arrays; + +public class BrokerGroupTest extends TestCase +{ + private final MemberHandle a = new SimpleMemberHandle("A", 1); + private final MemberHandle b = new SimpleMemberHandle("B", 1); + private final MemberHandle c = new SimpleMemberHandle("C", 1); + private final MemberHandle d = new SimpleMemberHandle("D", 1); + + //join (new members perspective) + // (i) connectToLeader() + // ==> check state + // (ii) setMembers() + // ==> check state + // ==> check members + // (iii) synched(leader) + // ==> check state + // ==> check peers + // (iv) synched(other) + // ==> check state + // ==> check peers + // repeat for all others + public void testJoin_newMember() throws Exception + { + MemberHandle[] pre = new MemberHandle[]{a, b, c}; + MemberHandle[] post = new MemberHandle[]{a, b, c}; + + BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory()); + assertEquals(JoinState.UNINITIALISED, group.getState()); + //(i) + group.connectToLeader(a); + assertEquals(JoinState.JOINING, group.getState()); + assertEquals("Wrong number of peers", 1, group.getPeers().size()); + //(ii) + group.setMembers(Arrays.asList(post)); + assertEquals(JoinState.INITIATION, group.getState()); + assertEquals(Arrays.asList(post), group.getMembers()); + //(iii) & (iv) + for (MemberHandle member : pre) + { + group.synched(member); + if (member == c) + { + assertEquals(JoinState.JOINED, group.getState()); + assertEquals("Wrong number of peers", pre.length, group.getPeers().size()); + } + else + { + assertEquals(JoinState.INDUCTION, group.getState()); + assertEquals("Wrong number of peers", 1, group.getPeers().size()); + } + } + } + + //join (leaders perspective) + // (i) extablish() + // ==> check state + // ==> check members + // ==> check peers + // (ii) connectToProspect() + // ==> check members + // ==> check peers + // repeat (ii) + public void testJoin_Leader() throws IOException, InterruptedException + { + MemberHandle[] prospects = new MemberHandle[]{b, c, d}; + + BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); + assertEquals(JoinState.UNINITIALISED, group.getState()); + //(i) + group.establish(); + assertEquals(JoinState.JOINED, group.getState()); + assertEquals("Wrong number of peers", 0, group.getPeers().size()); + assertEquals("Wrong number of members", 1, group.getMembers().size()); + assertEquals(a, group.getMembers().get(0)); + //(ii) + for (int i = 0; i < prospects.length; i++) + { + group.connectToProspect(prospects[i]); + assertEquals("Wrong number of peers", i + 1, group.getPeers().size()); + for (int j = 0; j <= i; j++) + { + assertTrue(prospects[i].matches(group.getPeers().get(i))); + } + assertEquals("Wrong number of members", i + 2, group.getMembers().size()); + assertEquals(a, group.getMembers().get(0)); + for (int j = 0; j <= i; j++) + { + assertEquals(prospects[i], group.getMembers().get(i + 1)); + } + } + } + + //join (general perspective) + // (i) set up group + // (ii) setMembers() + // ==> check members + // ==> check peers + public void testJoin_general() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] peers = new MemberHandle[]{a, b, d}; + + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + //(ii) + group.setMembers(Arrays.asList(view2)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(peers.length, group.getPeers().size()); + for (int i = 0; i < peers.length; i++) + { + assertTrue(peers[i].matches(group.getPeers().get(i))); + } + } + + //leadership transfer (valid) + // (i) set up group + // (ii) assumeLeadership() + // ==> check return value + // ==> check members + // ==> check peers + // ==> check isLeader() + // ==> check isLeader(old_leader) + // ==> check isMember(old_leader) + public void testTransferLeadership_valid() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view3 = new MemberHandle[]{b, c, d}; + + BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + group.setMembers(Arrays.asList(view2)); + //(ii) + boolean result = group.assumeLeadership(); + assertTrue(result); + assertTrue(group.isLeader()); + assertFalse(group.isLeader(a)); + assertEquals(Arrays.asList(view3), group.getMembers()); + assertEquals(2, group.getPeers().size()); + assertTrue(c.matches(group.getPeers().get(0))); + assertTrue(d.matches(group.getPeers().get(1))); + } + + //leadership transfer (invalid) + // (i) set up group + // (ii) assumeLeadership() + // ==> check return value + // ==> check members + // ==> check peers + // ==> check isLeader() + // ==> check isLeader(old_leader) + // ==> check isMember(old_leader) + public void testTransferLeadership_invalid() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + group.setMembers(Arrays.asList(view2)); + //(ii) + boolean result = group.assumeLeadership(); + assertFalse(result); + assertFalse(group.isLeader()); + assertTrue(group.isLeader(a)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(3, group.getPeers().size()); + assertTrue(a.matches(group.getPeers().get(0))); + assertTrue(b.matches(group.getPeers().get(1))); + assertTrue(d.matches(group.getPeers().get(2))); + + } + + //leave (leaders perspective) + // (i) set up group + // (ii) remove a member + // ==> check members + // ==> check peers + // ==> check isMember(removed_member) + // repeat (ii) + public void testLeave_leader() + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view2 = new MemberHandle[]{a, b, d}; + MemberHandle[] view3 = new MemberHandle[]{a, d}; + MemberHandle[] view4 = new MemberHandle[]{a}; + //(i) + BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); + group.establish(); + group.setMembers(Arrays.asList(view1)); + //(ii) + group.remove(group.findBroker(c, false)); + assertEquals(Arrays.asList(view2), group.getMembers()); + + group.remove(group.findBroker(b, false)); + assertEquals(Arrays.asList(view3), group.getMembers()); + + group.remove(group.findBroker(d, false)); + assertEquals(Arrays.asList(view4), group.getMembers()); + } + + + //leave (general perspective) + // (i) set up group + // (ii) setMember + // ==> check members + // ==> check peers + // ==> check isMember(removed_member) + // repeat (ii) + public void testLeave_general() + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view2 = new MemberHandle[]{a, c, d}; + //(i) + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + group.establish(); //not strictly the correct way to build up the group, but ok for here + group.setMembers(Arrays.asList(view1)); + //(ii) + group.setMembers(Arrays.asList(view2)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(2, group.getPeers().size()); + assertTrue(a.matches(group.getPeers().get(0))); + assertTrue(d.matches(group.getPeers().get(1))); + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java new file mode 100644 index 0000000000..d7ede7d3e0 --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/BrokerTest.java @@ -0,0 +1,231 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.policy.StandardPolicies; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class BrokerTest extends TestCase +{ + //group request (no failure) + public void testGroupRequest_noFailure() throws AMQException + { + RecordingBroker[] brokers = new RecordingBroker[]{ + new RecordingBroker("A", 1), + new RecordingBroker("B", 2), + new RecordingBroker("C", 3) + }; + GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers))); + GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + for (Broker b : brokers) + { + b.invoke(grpRequest); + } + grpRequest.finishedSend(); + + for (RecordingBroker b : brokers) + { + b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response")); + } + + assertTrue("Handler did not receive response", handler.isCompleted()); + } + + //group request (failure) + public void testGroupRequest_failure() throws AMQException + { + RecordingBroker a = new RecordingBroker("A", 1); + RecordingBroker b = new RecordingBroker("B", 2); + RecordingBroker c = new RecordingBroker("C", 3); + RecordingBroker[] all = new RecordingBroker[]{a, b, c}; + RecordingBroker[] succeeded = new RecordingBroker[]{a, c}; + + GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded))); + GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + + for (Broker broker : all) + { + broker.invoke(grpRequest); + } + grpRequest.finishedSend(); + + for (RecordingBroker broker : succeeded) + { + broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response")); + } + b.remove(); + + assertTrue("Handler did not receive response", handler.isCompleted()); + } + + + //simple send (no response) + public void testSend_noResponse() throws AMQException + { + AMQBody[] msgs = new AMQBody[]{ + new TestMethod("A"), + new TestMethod("B"), + new TestMethod("C") + }; + RecordingBroker broker = new RecordingBroker("myhost", 1); + for (AMQBody msg : msgs) + { + broker.send(new SimpleSendable(msg), null); + } + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(msgs.length, sent.size()); + for (int i = 0; i < msgs.length; i++) + { + assertTrue(sent.get(i) instanceof AMQFrame); + assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame); + } + } + + //simple send (no failure) + public void testSend_noFailure() throws AMQException + { + RecordingBroker broker = new RecordingBroker("myhost", 1); + BlockingHandler handler = new BlockingHandler(); + broker.send(new SimpleSendable(new TestMethod("A")), handler); + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(1, sent.size()); + assertTrue(sent.get(0) instanceof AMQFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame); + + broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B")); + + assertEquals(new TestMethod("B"), handler.getResponse()); + } + + //simple send (failure) + public void testSend_failure() throws AMQException + { + RecordingBroker broker = new RecordingBroker("myhost", 1); + BlockingHandler handler = new BlockingHandler(); + broker.send(new SimpleSendable(new TestMethod("A")), handler); + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(1, sent.size()); + assertTrue(sent.get(0) instanceof AMQFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame); + broker.remove(); + assertEquals(null, handler.getResponse()); + assertTrue(handler.isCompleted()); + assertTrue(handler.failed()); + } + + private static class TestMethod extends AMQMethodBody + { + private final Object id; + + TestMethod(Object id) + { + this.id = id; + } + + protected int getBodySize() + { + return 0; + } + + protected int getClazz() + { + return 1002; + } + + protected int getMethod() + { + return 1003; + } + + protected void writeMethodPayload(ByteBuffer buffer) + { + } + + protected byte getType() + { + return 0; + } + + protected int getSize() + { + return 0; + } + + protected void writePayload(ByteBuffer buffer) + { + } + + protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException + { + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + } + + public boolean equals(Object o) + { + return o instanceof TestMethod && id.equals(((TestMethod) o).id); + } + + public int hashCode() + { + return id.hashCode(); + } + + } + + private static class GroupResponseValidator implements GroupResponseHandler + { + private final AMQMethodBody _response; + private final List<Member> _members; + private boolean _completed = false; + + GroupResponseValidator(AMQMethodBody response, List<Member> members) + { + _response = response; + _members = members; + } + + public void response(List<AMQMethodBody> responses, List<Member> members) + { + for (AMQMethodBody r : responses) + { + assertEquals(_response, r); + } + assertEquals(_members, members); + _completed = true; + } + + boolean isCompleted() + { + return _completed; + } + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java new file mode 100644 index 0000000000..132ebd8ca0 --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class ClusterCapabilityTest +{ + @Test + public void startWithNull() + { + MemberHandle peer = new SimpleMemberHandle("myhost:9999"); + String c = ClusterCapability.add(null, peer); + assertTrue(ClusterCapability.contains(c)); + assertTrue(peer.matches(ClusterCapability.getPeer(c))); + } + + @Test + public void startWithText() + { + MemberHandle peer = new SimpleMemberHandle("myhost:9999"); + String c = ClusterCapability.add("existing text", peer); + assertTrue(ClusterCapability.contains(c)); + assertTrue(peer.matches(ClusterCapability.getPeer(c))); + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/InductionBufferTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/InductionBufferTest.java new file mode 100644 index 0000000000..fedf47d49a --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/InductionBufferTest.java @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; + +import java.util.List; +import java.util.ArrayList; + +import junit.framework.TestCase; + +public class InductionBufferTest extends TestCase +{ + public void test() throws Exception + { + IoSession session1 = new TestSession(); + IoSession session2 = new TestSession(); + IoSession session3 = new TestSession(); + + TestMessageHandler handler = new TestMessageHandler(); + InductionBuffer buffer = new InductionBuffer(handler); + + buffer.receive(session1, "one"); + buffer.receive(session2, "two"); + buffer.receive(session3, "three"); + + buffer.receive(session1, "four"); + buffer.receive(session1, "five"); + buffer.receive(session1, "six"); + + buffer.receive(session3, "seven"); + buffer.receive(session3, "eight"); + + handler.checkEmpty(); + buffer.deliver(); + + handler.check(session1, "one"); + handler.check(session2, "two"); + handler.check(session3, "three"); + + handler.check(session1, "four"); + handler.check(session1, "five"); + handler.check(session1, "six"); + + handler.check(session3, "seven"); + handler.check(session3, "eight"); + handler.checkEmpty(); + + buffer.receive(session1, "nine"); + buffer.receive(session2, "ten"); + buffer.receive(session3, "eleven"); + + handler.check(session1, "nine"); + handler.check(session2, "ten"); + handler.check(session3, "eleven"); + + handler.checkEmpty(); + } + + private static class TestMessageHandler implements InductionBuffer.MessageHandler + { + private final List<IoSession> _sessions = new ArrayList<IoSession>(); + private final List<Object> _msgs = new ArrayList<Object>(); + + public synchronized void deliver(IoSession session, Object msg) throws Exception + { + _sessions.add(session); + _msgs.add(msg); + } + + void check(IoSession actualSession, Object actualMsg) + { + assertFalse(_sessions.isEmpty()); + assertFalse(_msgs.isEmpty()); + IoSession expectedSession = _sessions.remove(0); + Object expectedMsg = _msgs.remove(0); + assertEquals(expectedSession, actualSession); + assertEquals(expectedMsg, actualMsg); + } + + void checkEmpty() + { + assertTrue(_sessions.isEmpty()); + assertTrue(_msgs.isEmpty()); + } + } +} + diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBroker.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBroker.java new file mode 100644 index 0000000000..388d584288 --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBroker.java @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; + +import java.util.ArrayList; +import java.util.List; + +class RecordingBroker extends TestBroker +{ + private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>(); + + RecordingBroker(String host, int port) + { + super(host, port); + } + + public void send(AMQDataBlock data) throws AMQException + { + _messages.add(data); + } + + List<AMQDataBlock> getMessages() + { + return _messages; + } + + void clear() + { + _messages.clear(); + } + +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java new file mode 100644 index 0000000000..e5e95323af --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +class RecordingBrokerFactory implements BrokerFactory +{ + public Broker create(MemberHandle handle) + { + return new RecordingBroker(handle.getHost(), handle.getPort()); + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleClusterTest.java new file mode 100644 index 0000000000..a4d13ea46d --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleClusterTest.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.junit.Test; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; + +import javax.jms.JMSException; + +public class SimpleClusterTest +{ + @Test + public void declareExchange() throws AMQException, JMSException, URLSyntaxException + { + AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test"); + AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + System.out.println("Session created"); + session.declareExchange("my_exchange", "direct"); + System.out.println("Exchange declared"); + con.close(); + System.out.println("Connection closed"); + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java new file mode 100644 index 0000000000..f7c728759b --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java @@ -0,0 +1,59 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class SimpleMemberHandleTest +{ + @Test + public void matches() + { + assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888)); + assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888)); + assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888)); + } + + + @Test + public void resolve() + { + assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000")); + } + + private void assertEquivalent(MemberHandle a, MemberHandle b) + { + String msg = a + " is not equivalent to " + b; + a = SimpleMemberHandle.resolve(a); + b = SimpleMemberHandle.resolve(b); + msg += "(" + a + " does not match " + b + ")"; + assertTrue(msg, a.matches(b)); + } + + private void assertMatch(MemberHandle a, MemberHandle b) + { + assertTrue(a + " does not match " + b, a.matches(b)); + } + + private void assertNoMatch(MemberHandle a, MemberHandle b) + { + assertFalse(a + " matches " + b, a.matches(b)); + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBroker.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBroker.java new file mode 100644 index 0000000000..c4a1985ae3 --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBroker.java @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; + +import java.io.IOException; + +class TestBroker extends Broker +{ + TestBroker(String host, int port) + { + super(host, port); + } + + boolean connect() throws IOException, InterruptedException + { + return true; + } + + void connectAsynch(Iterable<AMQMethodBody> msgs) + { + replay(msgs); + } + + void replay(Iterable<AMQMethodBody> msgs) + { + try + { + for (AMQMethodBody b : msgs) + { + send(new AMQFrame(0, b)); + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + Broker connectToCluster() throws IOException, InterruptedException + { + return this; + } + + public void send(AMQDataBlock data) throws AMQException + { + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBrokerFactory.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBrokerFactory.java new file mode 100644 index 0000000000..cd4e340925 --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestBrokerFactory.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +class TestBrokerFactory implements BrokerFactory +{ + public Broker create(MemberHandle handle) + { + return new TestBroker(handle.getHost(), handle.getPort()); + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestReplayManager.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestReplayManager.java new file mode 100644 index 0000000000..e2d6f75f19 --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestReplayManager.java @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.replay.ReplayManager; + +import java.util.ArrayList; +import java.util.List; + +class TestReplayManager implements ReplayManager +{ + private final List<AMQMethodBody> _msgs; + + TestReplayManager() + { + this(new ArrayList<AMQMethodBody>()); + } + + TestReplayManager(List<AMQMethodBody> msgs) + { + _msgs = msgs; + } + + public List<AMQMethodBody> replay(boolean isLeader) + { + return _msgs; + } +} diff --git a/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestSession.java new file mode 100644 index 0000000000..675e20c9dc --- /dev/null +++ b/java/cluster/src/test/java/java/org/apache/qpid/server/cluster/TestSession.java @@ -0,0 +1,266 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.*; + +import java.net.SocketAddress; +import java.util.Set; + +class TestSession implements IoSession +{ + public IoService getService() + { + return null; //TODO + } + + public IoServiceConfig getServiceConfig() + { + return null; //TODO + } + + public IoHandler getHandler() + { + return null; //TODO + } + + public IoSessionConfig getConfig() + { + return null; //TODO + } + + public IoFilterChain getFilterChain() + { + return null; //TODO + } + + public WriteFuture write(Object message) + { + return null; //TODO + } + + public CloseFuture close() + { + return null; //TODO + } + + public Object getAttachment() + { + return null; //TODO + } + + public Object setAttachment(Object attachment) + { + return null; //TODO + } + + public Object getAttribute(String key) + { + return null; //TODO + } + + public Object setAttribute(String key, Object value) + { + return null; //TODO + } + + public Object setAttribute(String key) + { + return null; //TODO + } + + public Object removeAttribute(String key) + { + return null; //TODO + } + + public boolean containsAttribute(String key) + { + return false; //TODO + } + + public Set getAttributeKeys() + { + return null; //TODO + } + + public TransportType getTransportType() + { + return null; //TODO + } + + public boolean isConnected() + { + return false; //TODO + } + + public boolean isClosing() + { + return false; //TODO + } + + public CloseFuture getCloseFuture() + { + return null; //TODO + } + + public SocketAddress getRemoteAddress() + { + return null; //TODO + } + + public SocketAddress getLocalAddress() + { + return null; //TODO + } + + public SocketAddress getServiceAddress() + { + return null; //TODO + } + + public int getIdleTime(IdleStatus status) + { + return 0; //TODO + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //TODO + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //TODO + } + + public int getWriteTimeout() + { + return 0; //TODO + } + + public long getWriteTimeoutInMillis() + { + return 0; //TODO + } + + public void setWriteTimeout(int writeTimeout) + { + //TODO + } + + public TrafficMask getTrafficMask() + { + return null; //TODO + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //TODO + } + + public void suspendRead() + { + //TODO + } + + public void suspendWrite() + { + //TODO + } + + public void resumeRead() + { + //TODO + } + + public void resumeWrite() + { + //TODO + } + + public long getReadBytes() + { + return 0; //TODO + } + + public long getWrittenBytes() + { + return 0; //TODO + } + + public long getReadMessages() + { + return 0; + } + + public long getWrittenMessages() + { + return 0; + } + + public long getWrittenWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //TODO + } + + public long getLastIoTime() + { + return 0; //TODO + } + + public long getLastReadTime() + { + return 0; //TODO + } + + public long getLastWriteTime() + { + return 0; //TODO + } + + public boolean isIdle(IdleStatus status) + { + return false; //TODO + } + + public int getIdleCount(IdleStatus status) + { + return 0; //TODO + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //TODO + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java new file mode 100644 index 0000000000..015e96f9c6 --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java @@ -0,0 +1,267 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; + +import java.io.IOException; +import java.util.Arrays; + +public class BrokerGroupTest extends TestCase +{ + private final MemberHandle a = new SimpleMemberHandle("A", 1); + private final MemberHandle b = new SimpleMemberHandle("B", 1); + private final MemberHandle c = new SimpleMemberHandle("C", 1); + private final MemberHandle d = new SimpleMemberHandle("D", 1); + + //join (new members perspective) + // (i) connectToLeader() + // ==> check state + // (ii) setMembers() + // ==> check state + // ==> check members + // (iii) synched(leader) + // ==> check state + // ==> check peers + // (iv) synched(other) + // ==> check state + // ==> check peers + // repeat for all others + public void testJoin_newMember() throws Exception + { + MemberHandle[] pre = new MemberHandle[]{a, b, c}; + MemberHandle[] post = new MemberHandle[]{a, b, c}; + + BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory()); + assertEquals(JoinState.UNINITIALISED, group.getState()); + //(i) + group.connectToLeader(a); + assertEquals(JoinState.JOINING, group.getState()); + assertEquals("Wrong number of peers", 1, group.getPeers().size()); + //(ii) + group.setMembers(Arrays.asList(post)); + assertEquals(JoinState.INITIATION, group.getState()); + assertEquals(Arrays.asList(post), group.getMembers()); + //(iii) & (iv) + for (MemberHandle member : pre) + { + group.synched(member); + if (member == c) + { + assertEquals(JoinState.JOINED, group.getState()); + assertEquals("Wrong number of peers", pre.length, group.getPeers().size()); + } + else + { + assertEquals(JoinState.INDUCTION, group.getState()); + assertEquals("Wrong number of peers", 1, group.getPeers().size()); + } + } + } + + //join (leaders perspective) + // (i) extablish() + // ==> check state + // ==> check members + // ==> check peers + // (ii) connectToProspect() + // ==> check members + // ==> check peers + // repeat (ii) + public void testJoin_Leader() throws IOException, InterruptedException + { + MemberHandle[] prospects = new MemberHandle[]{b, c, d}; + + BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); + assertEquals(JoinState.UNINITIALISED, group.getState()); + //(i) + group.establish(); + assertEquals(JoinState.JOINED, group.getState()); + assertEquals("Wrong number of peers", 0, group.getPeers().size()); + assertEquals("Wrong number of members", 1, group.getMembers().size()); + assertEquals(a, group.getMembers().get(0)); + //(ii) + for (int i = 0; i < prospects.length; i++) + { + group.connectToProspect(prospects[i]); + assertEquals("Wrong number of peers", i + 1, group.getPeers().size()); + for (int j = 0; j <= i; j++) + { + assertTrue(prospects[i].matches(group.getPeers().get(i))); + } + assertEquals("Wrong number of members", i + 2, group.getMembers().size()); + assertEquals(a, group.getMembers().get(0)); + for (int j = 0; j <= i; j++) + { + assertEquals(prospects[i], group.getMembers().get(i + 1)); + } + } + } + + //join (general perspective) + // (i) set up group + // (ii) setMembers() + // ==> check members + // ==> check peers + public void testJoin_general() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] peers = new MemberHandle[]{a, b, d}; + + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + //(ii) + group.setMembers(Arrays.asList(view2)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(peers.length, group.getPeers().size()); + for (int i = 0; i < peers.length; i++) + { + assertTrue(peers[i].matches(group.getPeers().get(i))); + } + } + + //leadership transfer (valid) + // (i) set up group + // (ii) assumeLeadership() + // ==> check return value + // ==> check members + // ==> check peers + // ==> check isLeader() + // ==> check isLeader(old_leader) + // ==> check isMember(old_leader) + public void testTransferLeadership_valid() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view3 = new MemberHandle[]{b, c, d}; + + BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + group.setMembers(Arrays.asList(view2)); + //(ii) + boolean result = group.assumeLeadership(); + assertTrue(result); + assertTrue(group.isLeader()); + assertFalse(group.isLeader(a)); + assertEquals(Arrays.asList(view3), group.getMembers()); + assertEquals(2, group.getPeers().size()); + assertTrue(c.matches(group.getPeers().get(0))); + assertTrue(d.matches(group.getPeers().get(1))); + } + + //leadership transfer (invalid) + // (i) set up group + // (ii) assumeLeadership() + // ==> check return value + // ==> check members + // ==> check peers + // ==> check isLeader() + // ==> check isLeader(old_leader) + // ==> check isMember(old_leader) + public void testTransferLeadership_invalid() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + group.setMembers(Arrays.asList(view2)); + //(ii) + boolean result = group.assumeLeadership(); + assertFalse(result); + assertFalse(group.isLeader()); + assertTrue(group.isLeader(a)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(3, group.getPeers().size()); + assertTrue(a.matches(group.getPeers().get(0))); + assertTrue(b.matches(group.getPeers().get(1))); + assertTrue(d.matches(group.getPeers().get(2))); + + } + + //leave (leaders perspective) + // (i) set up group + // (ii) remove a member + // ==> check members + // ==> check peers + // ==> check isMember(removed_member) + // repeat (ii) + public void testLeave_leader() + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view2 = new MemberHandle[]{a, b, d}; + MemberHandle[] view3 = new MemberHandle[]{a, d}; + MemberHandle[] view4 = new MemberHandle[]{a}; + //(i) + BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); + group.establish(); + group.setMembers(Arrays.asList(view1)); + //(ii) + group.remove(group.findBroker(c, false)); + assertEquals(Arrays.asList(view2), group.getMembers()); + + group.remove(group.findBroker(b, false)); + assertEquals(Arrays.asList(view3), group.getMembers()); + + group.remove(group.findBroker(d, false)); + assertEquals(Arrays.asList(view4), group.getMembers()); + } + + + //leave (general perspective) + // (i) set up group + // (ii) setMember + // ==> check members + // ==> check peers + // ==> check isMember(removed_member) + // repeat (ii) + public void testLeave_general() + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view2 = new MemberHandle[]{a, c, d}; + //(i) + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + group.establish(); //not strictly the correct way to build up the group, but ok for here + group.setMembers(Arrays.asList(view1)); + //(ii) + group.setMembers(Arrays.asList(view2)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(2, group.getPeers().size()); + assertTrue(a.matches(group.getPeers().get(0))); + assertTrue(d.matches(group.getPeers().get(1))); + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java new file mode 100644 index 0000000000..d7ede7d3e0 --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java @@ -0,0 +1,231 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.policy.StandardPolicies; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class BrokerTest extends TestCase +{ + //group request (no failure) + public void testGroupRequest_noFailure() throws AMQException + { + RecordingBroker[] brokers = new RecordingBroker[]{ + new RecordingBroker("A", 1), + new RecordingBroker("B", 2), + new RecordingBroker("C", 3) + }; + GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers))); + GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + for (Broker b : brokers) + { + b.invoke(grpRequest); + } + grpRequest.finishedSend(); + + for (RecordingBroker b : brokers) + { + b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response")); + } + + assertTrue("Handler did not receive response", handler.isCompleted()); + } + + //group request (failure) + public void testGroupRequest_failure() throws AMQException + { + RecordingBroker a = new RecordingBroker("A", 1); + RecordingBroker b = new RecordingBroker("B", 2); + RecordingBroker c = new RecordingBroker("C", 3); + RecordingBroker[] all = new RecordingBroker[]{a, b, c}; + RecordingBroker[] succeeded = new RecordingBroker[]{a, c}; + + GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded))); + GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + + for (Broker broker : all) + { + broker.invoke(grpRequest); + } + grpRequest.finishedSend(); + + for (RecordingBroker broker : succeeded) + { + broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response")); + } + b.remove(); + + assertTrue("Handler did not receive response", handler.isCompleted()); + } + + + //simple send (no response) + public void testSend_noResponse() throws AMQException + { + AMQBody[] msgs = new AMQBody[]{ + new TestMethod("A"), + new TestMethod("B"), + new TestMethod("C") + }; + RecordingBroker broker = new RecordingBroker("myhost", 1); + for (AMQBody msg : msgs) + { + broker.send(new SimpleSendable(msg), null); + } + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(msgs.length, sent.size()); + for (int i = 0; i < msgs.length; i++) + { + assertTrue(sent.get(i) instanceof AMQFrame); + assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame); + } + } + + //simple send (no failure) + public void testSend_noFailure() throws AMQException + { + RecordingBroker broker = new RecordingBroker("myhost", 1); + BlockingHandler handler = new BlockingHandler(); + broker.send(new SimpleSendable(new TestMethod("A")), handler); + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(1, sent.size()); + assertTrue(sent.get(0) instanceof AMQFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame); + + broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B")); + + assertEquals(new TestMethod("B"), handler.getResponse()); + } + + //simple send (failure) + public void testSend_failure() throws AMQException + { + RecordingBroker broker = new RecordingBroker("myhost", 1); + BlockingHandler handler = new BlockingHandler(); + broker.send(new SimpleSendable(new TestMethod("A")), handler); + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(1, sent.size()); + assertTrue(sent.get(0) instanceof AMQFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame); + broker.remove(); + assertEquals(null, handler.getResponse()); + assertTrue(handler.isCompleted()); + assertTrue(handler.failed()); + } + + private static class TestMethod extends AMQMethodBody + { + private final Object id; + + TestMethod(Object id) + { + this.id = id; + } + + protected int getBodySize() + { + return 0; + } + + protected int getClazz() + { + return 1002; + } + + protected int getMethod() + { + return 1003; + } + + protected void writeMethodPayload(ByteBuffer buffer) + { + } + + protected byte getType() + { + return 0; + } + + protected int getSize() + { + return 0; + } + + protected void writePayload(ByteBuffer buffer) + { + } + + protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException + { + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + } + + public boolean equals(Object o) + { + return o instanceof TestMethod && id.equals(((TestMethod) o).id); + } + + public int hashCode() + { + return id.hashCode(); + } + + } + + private static class GroupResponseValidator implements GroupResponseHandler + { + private final AMQMethodBody _response; + private final List<Member> _members; + private boolean _completed = false; + + GroupResponseValidator(AMQMethodBody response, List<Member> members) + { + _response = response; + _members = members; + } + + public void response(List<AMQMethodBody> responses, List<Member> members) + { + for (AMQMethodBody r : responses) + { + assertEquals(_response, r); + } + assertEquals(_members, members); + _completed = true; + } + + boolean isCompleted() + { + return _completed; + } + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java new file mode 100644 index 0000000000..132ebd8ca0 --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class ClusterCapabilityTest +{ + @Test + public void startWithNull() + { + MemberHandle peer = new SimpleMemberHandle("myhost:9999"); + String c = ClusterCapability.add(null, peer); + assertTrue(ClusterCapability.contains(c)); + assertTrue(peer.matches(ClusterCapability.getPeer(c))); + } + + @Test + public void startWithText() + { + MemberHandle peer = new SimpleMemberHandle("myhost:9999"); + String c = ClusterCapability.add("existing text", peer); + assertTrue(ClusterCapability.contains(c)); + assertTrue(peer.matches(ClusterCapability.getPeer(c))); + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java new file mode 100644 index 0000000000..fedf47d49a --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; + +import java.util.List; +import java.util.ArrayList; + +import junit.framework.TestCase; + +public class InductionBufferTest extends TestCase +{ + public void test() throws Exception + { + IoSession session1 = new TestSession(); + IoSession session2 = new TestSession(); + IoSession session3 = new TestSession(); + + TestMessageHandler handler = new TestMessageHandler(); + InductionBuffer buffer = new InductionBuffer(handler); + + buffer.receive(session1, "one"); + buffer.receive(session2, "two"); + buffer.receive(session3, "three"); + + buffer.receive(session1, "four"); + buffer.receive(session1, "five"); + buffer.receive(session1, "six"); + + buffer.receive(session3, "seven"); + buffer.receive(session3, "eight"); + + handler.checkEmpty(); + buffer.deliver(); + + handler.check(session1, "one"); + handler.check(session2, "two"); + handler.check(session3, "three"); + + handler.check(session1, "four"); + handler.check(session1, "five"); + handler.check(session1, "six"); + + handler.check(session3, "seven"); + handler.check(session3, "eight"); + handler.checkEmpty(); + + buffer.receive(session1, "nine"); + buffer.receive(session2, "ten"); + buffer.receive(session3, "eleven"); + + handler.check(session1, "nine"); + handler.check(session2, "ten"); + handler.check(session3, "eleven"); + + handler.checkEmpty(); + } + + private static class TestMessageHandler implements InductionBuffer.MessageHandler + { + private final List<IoSession> _sessions = new ArrayList<IoSession>(); + private final List<Object> _msgs = new ArrayList<Object>(); + + public synchronized void deliver(IoSession session, Object msg) throws Exception + { + _sessions.add(session); + _msgs.add(msg); + } + + void check(IoSession actualSession, Object actualMsg) + { + assertFalse(_sessions.isEmpty()); + assertFalse(_msgs.isEmpty()); + IoSession expectedSession = _sessions.remove(0); + Object expectedMsg = _msgs.remove(0); + assertEquals(expectedSession, actualSession); + assertEquals(expectedMsg, actualMsg); + } + + void checkEmpty() + { + assertTrue(_sessions.isEmpty()); + assertTrue(_msgs.isEmpty()); + } + } +} + diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java new file mode 100644 index 0000000000..388d584288 --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; + +import java.util.ArrayList; +import java.util.List; + +class RecordingBroker extends TestBroker +{ + private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>(); + + RecordingBroker(String host, int port) + { + super(host, port); + } + + public void send(AMQDataBlock data) throws AMQException + { + _messages.add(data); + } + + List<AMQDataBlock> getMessages() + { + return _messages; + } + + void clear() + { + _messages.clear(); + } + +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java new file mode 100644 index 0000000000..e5e95323af --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +class RecordingBrokerFactory implements BrokerFactory +{ + public Broker create(MemberHandle handle) + { + return new RecordingBroker(handle.getHost(), handle.getPort()); + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java new file mode 100644 index 0000000000..a4d13ea46d --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.junit.Test; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; + +import javax.jms.JMSException; + +public class SimpleClusterTest +{ + @Test + public void declareExchange() throws AMQException, JMSException, URLSyntaxException + { + AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test"); + AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + System.out.println("Session created"); + session.declareExchange("my_exchange", "direct"); + System.out.println("Exchange declared"); + con.close(); + System.out.println("Connection closed"); + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java new file mode 100644 index 0000000000..f7c728759b --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java @@ -0,0 +1,59 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class SimpleMemberHandleTest +{ + @Test + public void matches() + { + assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888)); + assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888)); + assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888)); + } + + + @Test + public void resolve() + { + assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000")); + } + + private void assertEquivalent(MemberHandle a, MemberHandle b) + { + String msg = a + " is not equivalent to " + b; + a = SimpleMemberHandle.resolve(a); + b = SimpleMemberHandle.resolve(b); + msg += "(" + a + " does not match " + b + ")"; + assertTrue(msg, a.matches(b)); + } + + private void assertMatch(MemberHandle a, MemberHandle b) + { + assertTrue(a + " does not match " + b, a.matches(b)); + } + + private void assertNoMatch(MemberHandle a, MemberHandle b) + { + assertFalse(a + " matches " + b, a.matches(b)); + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java new file mode 100644 index 0000000000..c4a1985ae3 --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; + +import java.io.IOException; + +class TestBroker extends Broker +{ + TestBroker(String host, int port) + { + super(host, port); + } + + boolean connect() throws IOException, InterruptedException + { + return true; + } + + void connectAsynch(Iterable<AMQMethodBody> msgs) + { + replay(msgs); + } + + void replay(Iterable<AMQMethodBody> msgs) + { + try + { + for (AMQMethodBody b : msgs) + { + send(new AMQFrame(0, b)); + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + Broker connectToCluster() throws IOException, InterruptedException + { + return this; + } + + public void send(AMQDataBlock data) throws AMQException + { + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java new file mode 100644 index 0000000000..cd4e340925 --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +class TestBrokerFactory implements BrokerFactory +{ + public Broker create(MemberHandle handle) + { + return new TestBroker(handle.getHost(), handle.getPort()); + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java new file mode 100644 index 0000000000..e2d6f75f19 --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.replay.ReplayManager; + +import java.util.ArrayList; +import java.util.List; + +class TestReplayManager implements ReplayManager +{ + private final List<AMQMethodBody> _msgs; + + TestReplayManager() + { + this(new ArrayList<AMQMethodBody>()); + } + + TestReplayManager(List<AMQMethodBody> msgs) + { + _msgs = msgs; + } + + public List<AMQMethodBody> replay(boolean isLeader) + { + return _msgs; + } +} diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java new file mode 100644 index 0000000000..675e20c9dc --- /dev/null +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java @@ -0,0 +1,266 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.*; + +import java.net.SocketAddress; +import java.util.Set; + +class TestSession implements IoSession +{ + public IoService getService() + { + return null; //TODO + } + + public IoServiceConfig getServiceConfig() + { + return null; //TODO + } + + public IoHandler getHandler() + { + return null; //TODO + } + + public IoSessionConfig getConfig() + { + return null; //TODO + } + + public IoFilterChain getFilterChain() + { + return null; //TODO + } + + public WriteFuture write(Object message) + { + return null; //TODO + } + + public CloseFuture close() + { + return null; //TODO + } + + public Object getAttachment() + { + return null; //TODO + } + + public Object setAttachment(Object attachment) + { + return null; //TODO + } + + public Object getAttribute(String key) + { + return null; //TODO + } + + public Object setAttribute(String key, Object value) + { + return null; //TODO + } + + public Object setAttribute(String key) + { + return null; //TODO + } + + public Object removeAttribute(String key) + { + return null; //TODO + } + + public boolean containsAttribute(String key) + { + return false; //TODO + } + + public Set getAttributeKeys() + { + return null; //TODO + } + + public TransportType getTransportType() + { + return null; //TODO + } + + public boolean isConnected() + { + return false; //TODO + } + + public boolean isClosing() + { + return false; //TODO + } + + public CloseFuture getCloseFuture() + { + return null; //TODO + } + + public SocketAddress getRemoteAddress() + { + return null; //TODO + } + + public SocketAddress getLocalAddress() + { + return null; //TODO + } + + public SocketAddress getServiceAddress() + { + return null; //TODO + } + + public int getIdleTime(IdleStatus status) + { + return 0; //TODO + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //TODO + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //TODO + } + + public int getWriteTimeout() + { + return 0; //TODO + } + + public long getWriteTimeoutInMillis() + { + return 0; //TODO + } + + public void setWriteTimeout(int writeTimeout) + { + //TODO + } + + public TrafficMask getTrafficMask() + { + return null; //TODO + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //TODO + } + + public void suspendRead() + { + //TODO + } + + public void suspendWrite() + { + //TODO + } + + public void resumeRead() + { + //TODO + } + + public void resumeWrite() + { + //TODO + } + + public long getReadBytes() + { + return 0; //TODO + } + + public long getWrittenBytes() + { + return 0; //TODO + } + + public long getReadMessages() + { + return 0; + } + + public long getWrittenMessages() + { + return 0; + } + + public long getWrittenWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //TODO + } + + public long getLastIoTime() + { + return 0; //TODO + } + + public long getLastReadTime() + { + return 0; //TODO + } + + public long getLastWriteTime() + { + return 0; //TODO + } + + public boolean isIdle(IdleStatus status) + { + return false; //TODO + } + + public int getIdleCount(IdleStatus status) + { + return 0; //TODO + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //TODO + } +} |