summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-02-29 16:00:19 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-02-29 16:00:19 +0000
commita91e0a19c3ca405b659d21593f433fca692acbec (patch)
tree6f010166d7b5759520c39f8f48bcbc50e8290cef
parent48248ab5bcab430eddaa887c6468ca7d1939c9d3 (diff)
downloadqpid-python-a91e0a19c3ca405b659d21593f433fca692acbec.tar.gz
QPID-702 Removed cluster code as it has been scheduled for such since last year.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@632363 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/cluster/doc/design.docbin70144 -> 0 bytes
-rw-r--r--java/cluster/pom.xml69
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java42
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java46
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java45
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java91
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java26
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java247
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java26
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java368
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java73
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java139
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java63
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java60
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java190
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java133
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java80
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java396
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java72
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java107
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java90
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java26
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java107
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java117
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java26
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java36
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java28
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java29
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java28
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java44
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java272
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java36
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java30
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java28
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java98
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java48
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java166
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java55
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java73
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java136
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java51
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java239
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java55
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java25
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java79
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java38
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java60
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java62
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java59
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java69
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java90
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java125
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java56
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java85
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java29
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java48
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java83
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java32
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java77
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java37
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java311
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java83
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java72
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java53
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java61
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java175
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java102
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java116
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java64
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java60
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java91
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java176
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java56
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java270
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java237
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java43
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java106
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java53
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java29
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java45
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java57
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java70
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java29
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java47
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java269
91 files changed, 0 insertions, 8006 deletions
diff --git a/java/cluster/doc/design.doc b/java/cluster/doc/design.doc
deleted file mode 100644
index c5bbf0f8a4..0000000000
--- a/java/cluster/doc/design.doc
+++ /dev/null
Binary files differ
diff --git a/java/cluster/pom.xml b/java/cluster/pom.xml
deleted file mode 100644
index 73c62520e6..0000000000
--- a/java/cluster/pom.xml
+++ /dev/null
@@ -1,69 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-cluster</artifactId>
- <packaging>jar</packaging>
- <version>1.0-incubating-M2.1-SNAPSHOT</version>
- <name>Qpid Cluster</name>
- <url>http://cwiki.apache.org/confluence/display/qpid</url>
-
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid</artifactId>
- <version>1.0-incubating-M2.1-SNAPSHOT</version>
- </parent>
-
- <properties>
- <topDirectoryLocation>..</topDirectoryLocation>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
deleted file mode 100644
index 2baaa344ef..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-
-/**
- * AMQConnectionWaitException represents a failure to connect to a cluster peer in a timely manner.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to connect to a cluster peer in a timely manner.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- */
-public class AMQConnectionWaitException extends AMQException
-{
- public AMQConnectionWaitException(String s, Throwable e)
- {
- super(s, e);
-
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
deleted file mode 100644
index 951bd22df0..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
-
-/**
- * AMQUnexpectedBodyTypeException represents a failure where a message body does not match its expected type. For example,
- * and AMQP method should have a method body.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents a failure where a message body does not match its expected type.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
- * be better just to leave that as a ClassCastException. Check that the framing layer will pick up the error first.
- */
-public class AMQUnexpectedBodyTypeException extends AMQException
-{
- public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body)
- {
- super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
deleted file mode 100644
index 4dd318f90d..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-
-/**
- * AMQUnexpectedFrameTypeException represents a failure when Mina passes an unexpected frame type.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to cast a frame to its expected type.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
- * be better just to leave that as a ClassCastException. However, check the framing layer catches this error
- * first.
- */
-public class AMQUnexpectedFrameTypeException extends AMQException
-{
- public AMQUnexpectedFrameTypeException(String s)
- {
- super(s);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java
deleted file mode 100644
index 39508df566..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-public class BlockingHandler implements ResponseHandler
-{
- private final Class _expected;
- private boolean _completed;
- private AMQMethodBody _response;
-
-
- public BlockingHandler()
- {
- this(AMQMethodBody.class);
- }
-
- public BlockingHandler(Class<? extends AMQMethodBody> expected)
- {
- _expected = expected;
- }
-
- public void responded(AMQMethodBody response)
- {
- if (_expected.isInstance(response))
- {
- _response = response;
- completed();
- }
- }
-
- public void removed()
- {
- completed();
- }
-
- private synchronized void completed()
- {
- _completed = true;
- notifyAll();
- }
-
- synchronized void waitForCompletion()
- {
- while (!_completed)
- {
- try
- {
- wait();
- }
- catch (InterruptedException ignore)
- {
-
- }
- }
- }
-
- AMQMethodBody getResponse()
- {
- return _response;
- }
-
- boolean failed()
- {
- return _response == null;
- }
-
- boolean isCompleted()
- {
- return _completed;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java
deleted file mode 100644
index 145aa58574..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-public interface BroadcastPolicy
-{
- public boolean isComplete(int responded, int members);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java
deleted file mode 100644
index 7e2cf6da83..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * An implementation of the Member interface (through which data is sent to other
- * peers in the cluster). This class provides a base from which subclasses can
- * inherit some common behaviour for broadcasting GroupRequests and sending methods
- * that may expect a response. It also extends the Member abstraction to support
- * a richer set of operations that are useful within the package but should not be
- * exposed outside of it.
- *
- */
-abstract class Broker extends SimpleMemberHandle implements Member
-{
- private static final Logger _logger = Logger.getLogger(Broker.class);
- private static final int DEFAULT_CHANNEL = 1;
- private static final int START_CHANNEL = 2;
- private static final int END_CHANNEL = 10000;
-
-
- private MemberFailureListener _listener;
- //a wrap-around counter to allocate _requests a unique channel:
- private int _nextChannel = START_CHANNEL;
- //outstanding _requests:
- private final Map<Integer, ResponseHandler> _requests = new HashMap<Integer, ResponseHandler>();
-
- Broker(String host, int port)
- {
- super(host, port);
- }
-
- /**
- * Allows a listener to be registered that will receive callbacks when communication
- * to the peer this broker instance represents fails.
- * @param listener the callback to be notified of failures
- */
- public void addFailureListener(MemberFailureListener listener)
- {
- _listener = listener;
- }
-
- /**
- * Allows subclasses to signal comunication failures
- */
- protected void failed()
- {
- if (_listener != null)
- {
- _listener.failed(this);
- }
- }
-
- /**
- * Subclasses should call this on receiving message responses from the remote
- * peer. They are matched to any outstanding request they might be response
- * to, with the completion and callback of that request being managed if
- * required.
- *
- * @param channel the channel on which the method was received
- * @param response the response received
- * @return true if the response matched an outstanding request
- */
- protected synchronized boolean handleResponse(int channel, AMQMethodBody response)
- {
- ResponseHandler request = _requests.get(channel);
- if (request == null)
- {
- if(!_requests.isEmpty())
- {
- _logger.warn(new LogMessage("[next channel={3, integer}]: Response {0} on channel {1, integer} failed to match outstanding requests: {2}", response, channel, _requests, _nextChannel));
- }
- return false;
- }
- else
- {
- request.responded(response);
- return true;
- }
- }
-
- /**
- * Called when this broker is excluded from the group. Any requests made on
- * it are informed this member has left the group.
- */
- synchronized void remove()
- {
- for (ResponseHandler r : _requests.values())
- {
- r.removed();
- }
- }
-
- /**
- * Engages this broker in the specified group request
- *
- * @param request the request being made to a group of brokers
- * @throws AMQException if there is any failure
- */
- synchronized void invoke(GroupRequest request) throws AMQException
- {
- int channel = nextChannel();
- _requests.put(channel, new GroupRequestAdapter(request, channel));
- request.send(channel, this);
- }
-
- /**
- * Sends a message to the remote peer and undertakes to notify the specified
- * handler of the response.
- *
- * @param msg the message to send
- * @param handler the callback to notify of responses (or the removal of this broker
- * from the group)
- * @throws AMQException
- */
- synchronized void send(Sendable msg, ResponseHandler handler) throws AMQException
- {
- int channel;
- if (handler != null)
- {
- channel = nextChannel();
- _requests.put(channel, new RemovingWrapper(handler, channel));
- }
- else
- {
- channel = DEFAULT_CHANNEL;
- }
-
- msg.send(channel, this);
- }
-
- private int nextChannel()
- {
- int channel = _nextChannel++;
- if(_nextChannel >= END_CHANNEL)
- {
- _nextChannel = START_CHANNEL;
- }
- return channel;
- }
-
- /**
- * extablish connection without handling redirect
- */
- abstract boolean connect() throws IOException, InterruptedException;
-
- /**
- * Start connection process, including replay
- */
- abstract void connectAsynch(Iterable<AMQMethodBody> msgs);
-
- /**
- * Replay messages to the remote peer this instance represents. These messages
- * must be sent before any others whose transmission is requested through send() etc.
- *
- * @param msgs
- */
- abstract void replay(Iterable<AMQMethodBody> msgs);
-
- /**
- * establish connection, handling redirect if required...
- */
- abstract Broker connectToCluster() throws IOException, InterruptedException;
-
- private class GroupRequestAdapter implements ResponseHandler
- {
- private final GroupRequest request;
- private final int channel;
-
- GroupRequestAdapter(GroupRequest request, int channel)
- {
- this.request = request;
- this.channel = channel;
- }
-
- public void responded(AMQMethodBody response)
- {
- request.responseReceived(Broker.this, response);
- _requests.remove(channel);
- }
-
- public void removed()
- {
- request.removed(Broker.this);
- }
-
- public String toString()
- {
- return "GroupRequestAdapter{" + channel + ", " + request + "}";
- }
- }
-
- private class RemovingWrapper implements ResponseHandler
- {
- private final ResponseHandler handler;
- private final int channel;
-
- RemovingWrapper(ResponseHandler handler, int channel)
- {
- this.handler = handler;
- this.channel = channel;
- }
-
- public void responded(AMQMethodBody response)
- {
- handler.responded(response);
- _requests.remove(channel);
- }
-
- public void removed()
- {
- handler.removed();
- }
-
- public String toString()
- {
- return "RemovingWrapper{" + channel + ", " + handler + "}";
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java
deleted file mode 100644
index 92c3c4e7bf..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-interface BrokerFactory
-{
- public Broker create(MemberHandle handle);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java
deleted file mode 100644
index 755a341607..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.cluster.replay.ReplayManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.util.InvokeMultiple;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Manages the membership list of a group and the set of brokers representing the
- * remote peers. The group should be initialised through a call to establish()
- * or connectToLeader().
- *
- */
-class BrokerGroup
-{
- private static final Logger _logger = Logger.getLogger(BrokerGroup.class);
-
- private final InvokeMultiple<MembershipChangeListener> _changeListeners = new InvokeMultiple<MembershipChangeListener>(MembershipChangeListener.class);
- private final ReplayManager _replayMgr;
- private final MemberHandle _local;
- private final BrokerFactory _factory;
- private final Object _lock = new Object();
- private final Set<MemberHandle> _synch = new HashSet<MemberHandle>();
- private List<MemberHandle> _members;
- private List<Broker> _peers = new ArrayList<Broker>();
- private JoinState _state = JoinState.UNINITIALISED;
-
- /**
- * Creates an unitialised group.
- *
- * @param local a handle that represents the local broker
- * @param replayMgr the replay manager to use when creating new brokers
- * @param factory the factory through which broker instances are created
- */
- BrokerGroup(MemberHandle local, ReplayManager replayMgr, BrokerFactory factory)
- {
- _replayMgr = replayMgr;
- _local = local;
- _factory = factory;
- }
-
- /**
- * Called to establish the local broker as the leader of a new group
- */
- void establish()
- {
- synchronized (_lock)
- {
- setState(JoinState.JOINED);
- _members = new ArrayList<MemberHandle>();
- _members.add(_local);
- }
- fireChange();
- }
-
- /**
- * Called by prospect to connect to group
- */
- Broker connectToLeader(MemberHandle handle) throws Exception
- {
- Broker leader = _factory.create(handle);
- leader = leader.connectToCluster();
- synchronized (_lock)
- {
- setState(JoinState.JOINING);
- _members = new ArrayList<MemberHandle>();
- _members.add(leader);
- _peers.add(leader);
- }
- fireChange();
- return leader;
- }
-
- /**
- * Called by leader when handling a join request
- */
- Broker connectToProspect(MemberHandle handle) throws IOException, InterruptedException
- {
- Broker prospect = _factory.create(handle);
- prospect.connect();
- synchronized (_lock)
- {
- _members.add(prospect);
- _peers.add(prospect);
- }
- fireChange();
- return prospect;
- }
-
- /**
- * Called in reponse to membership announcements.
- *
- * @param members the list of members now part of the group
- */
- void setMembers(List<MemberHandle> members)
- {
- if (isJoined())
- {
- List<Broker> old = _peers;
-
- synchronized (_lock)
- {
- _peers = getBrokers(members);
- _members = new ArrayList<MemberHandle>(members);
- }
-
- //remove those that are still members
- old.removeAll(_peers);
-
- //handle failure of any brokers that haven't survived
- for (Broker peer : old)
- {
- peer.remove();
- }
- }
- else
- {
- synchronized (_lock)
- {
- setState(JoinState.INITIATION);
- _members = new ArrayList<MemberHandle>(members);
- _synch.addAll(_members);
- _synch.remove(_local);
- }
- }
- fireChange();
- }
-
- List<MemberHandle> getMembers()
- {
- synchronized (_lock)
- {
- return Collections.unmodifiableList(_members);
- }
- }
-
- List<Broker> getPeers()
- {
- synchronized (_lock)
- {
- return _peers;
- }
- }
-
- /**
- * Removes the member presented from the group
- * @param peer the broker that should be removed
- */
- void remove(Broker peer)
- {
- synchronized (_lock)
- {
- _peers.remove(peer);
- _members.remove(peer);
- }
- fireChange();
- }
-
- MemberHandle getLocal()
- {
- return _local;
- }
-
- Broker getLeader()
- {
- synchronized (_lock)
- {
- return _peers.size() > 0 ? _peers.get(0) : null;
- }
- }
-
- /**
- * Allows a Broker instance to be retrieved for a given handle
- *
- * @param handle the handle for which a broker is sought
- * @param create flag to indicate whther a broker should be created for the handle if
- * one is not found within the list of known peers
- * @return the broker corresponding to handle or null if a match cannot be found and
- * create is false
- */
- Broker findBroker(MemberHandle handle, boolean create)
- {
- if (handle instanceof Broker)
- {
- return (Broker) handle;
- }
- else
- {
- for (Broker b : getPeers())
- {
- if (b.matches(handle))
- {
- return b;
- }
- }
- }
- if (create)
- {
- Broker b = _factory.create(handle);
- List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local));
- _logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b));
- b.connectAsynch(msgs);
-
- return b;
- }
- else
- {
- return null;
- }
- }
-
- /**
- * @param member the member to test for leadership
- * @return true if the passed in member is the group leader, false otherwise
- */
- boolean isLeader(MemberHandle member)
- {
- synchronized (_lock)
- {
- return member.matches(_members.get(0));
- }
- }
-
- /**
- * @return true if the local broker is the group leader, false otherwise
- */
- boolean isLeader()
- {
- return isLeader(_local);
- }
-
- /**
- * Used when the leader fails and the next broker in the list needs to
- * assume leadership
- * @return true if the action succeeds
- */
- boolean assumeLeadership()
- {
- boolean valid;
- synchronized (_lock)
- {
- valid = _members.size() > 1 && _local.matches(_members.get(1));
- if (valid)
- {
- _members.remove(0);
- _peers.remove(0);
- }
- }
- fireChange();
- return valid;
- }
-
- /**
- * Called in response to a Cluster.Synch message being received during the join
- * process. This indicates that the member mentioned has replayed all necessary
- * messages to the local broker.
- *
- * @param member the member from whom the synch messages was received
- */
- void synched(MemberHandle member)
- {
- _logger.info(new LogMessage("Synchronised with {0}", member));
- synchronized (_lock)
- {
- if (isLeader(member))
- {
- setState(JoinState.INDUCTION);
- }
- _synch.remove(member);
- if (_synch.isEmpty())
- {
- _peers = getBrokers(_members);
- setState(JoinState.JOINED);
- }
- }
- }
-
-
- /**
- * @return the state of the group
- */
- JoinState getState()
- {
- synchronized (_lock)
- {
- return _state;
- }
- }
-
- void addMemberhipChangeListener(MembershipChangeListener l)
- {
- _changeListeners.addListener(l);
- }
-
- void removeMemberhipChangeListener(MembershipChangeListener l)
- {
- _changeListeners.removeListener(l);
- }
-
-
-
- private void setState(JoinState state)
- {
- _logger.info(new LogMessage("Changed state from {0} to {1}", _state, state));
- _state = state;
- }
-
- private boolean isJoined()
- {
- return inState(JoinState.JOINED);
- }
-
- private boolean inState(JoinState state)
- {
- return _state.equals(state);
- }
-
- private List<Broker> getBrokers(List<MemberHandle> handles)
- {
- List<Broker> brokers = new ArrayList<Broker>();
- for (MemberHandle handle : handles)
- {
- if (!_local.matches(handle))
- {
- brokers.add(findBroker(handle, true));
- }
- }
- return brokers;
- }
-
- private void fireChange()
- {
- List<MemberHandle> members;
- synchronized(this)
- {
- members = new ArrayList(_members);
- }
- _changeListeners.getProxy().changed(Collections.unmodifiableList(members));
- }
-} \ No newline at end of file
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
deleted file mode 100644
index 1b4a3e8327..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.framing.AMQMethodBody;
-
-/**
- * Hack to assist with reuse of the client handlers for connection setup in
- * the inter-broker communication within the cluster.
- *
- */
-class ClientAdapter implements MethodHandler
-{
- private final AMQProtocolSession _session;
- private final AMQStateManager _stateMgr;
-
- ClientAdapter(IoSession session, AMQStateManager stateMgr)
- {
- this(session, stateMgr, "guest", "guest", session.toString(), "/cluster");
- }
-
- ClientAdapter(IoSession session, AMQStateManager stateMgr, String user, String password, String name, String path)
- {
- _session = new SessionAdapter(session, new ConnectionAdapter(user, password, name, path));
- _stateMgr = stateMgr;
- }
-
- public void handle(int channel, AMQMethodBody method) throws AMQException
- {
- AMQMethodEvent evt = new AMQMethodEvent(channel, method);
- _stateMgr.methodReceived(evt);
- }
-
- private class SessionAdapter extends AMQProtocolSession
- {
- public SessionAdapter(IoSession session, AMQConnection connection)
- {
- super(null, session, connection);
- }
- }
-
- private static class ConnectionAdapter extends AMQConnection
- {
- ConnectionAdapter(String username, String password, String clientName, String virtualPath)
- {
- super(username, password, clientName, virtualPath);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
deleted file mode 100644
index f4a8e4c1e2..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
-// import org.apache.qpid.client.state.IllegalStateTransitionException;
-import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * An extension of client.AMQStateManager that allows different handlers to be registered.
- *
- */
-public class ClientHandlerRegistry extends AMQStateManager
-{
- private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
- private final MemberHandle _identity;
-
- protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession)
- {
- super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
-
- _identity = local;
-
- addHandler(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance(),
- AMQState.CONNECTION_NOT_STARTED);
-
- addHandler(ConnectionTuneBody.class, new ConnectionTuneHandler(),
- AMQState.CONNECTION_NOT_TUNED);
- addHandler(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance(),
- AMQState.CONNECTION_NOT_TUNED);
- addHandler(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance(),
- AMQState.CONNECTION_NOT_OPENED);
-
- addHandlers(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance(),
- AMQState.CONNECTION_NOT_STARTED,
- AMQState.CONNECTION_NOT_TUNED,
- AMQState.CONNECTION_NOT_OPENED);
-
- }
-
- private ClientRegistry state(AMQState state)
- {
- ClientRegistry registry = _handlers.get(state);
- if (registry == null)
- {
- registry = new ClientRegistry();
- _handlers.put(state, registry);
- }
- return registry;
- }
-
- protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) //throws IllegalStateTransitionException
- {
- ClientRegistry registry = _handlers.get(state);
- return registry == null ? null : registry.getHandler(frame);
- }
-
-
- <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states)
- {
- for (AMQState state : states)
- {
- addHandler(type, handler, state);
- }
- }
-
- <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state)
- {
- ClientRegistry registry = _handlers.get(state);
- if (registry == null)
- {
- registry = new ClientRegistry();
- _handlers.put(state, registry);
- }
- registry.add(type, handler);
- }
-
- static class ClientRegistry
- {
- private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry
- = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>();
-
- <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler)
- {
- registry.put(type, handler);
- }
-
- StateAwareMethodListener getHandler(AMQMethodBody frame)
- {
- return registry.get(frame.getClass());
- }
- }
-
- class ConnectionTuneHandler extends ConnectionTuneMethodHandler
- {
- protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
- {
- // Be aware of possible changes to parameter order as versions change.
- return ConnectionOpenBody.createAMQFrame(channel,
- major,
- minor,
- // AMQP version (major, minor)
- new AMQShortString(ClusterCapability.add(capabilities, _identity)),
- // capabilities
- insist,
- // insist
- path);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
deleted file mode 100644
index 80f9ef62b1..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.server.cluster.handler.ClusterMethodHandlerFactory;
-import org.apache.qpid.server.cluster.replay.RecordingMethodHandlerFactory;
-import org.apache.qpid.server.cluster.replay.ReplayStore;
-
-import java.net.InetSocketAddress;
-
-class ClusterBuilder
-{
- private final LoadTable loadTable = new LoadTable();
- private final ReplayStore replayStore = new ReplayStore();
- private final MemberHandle handle;
- private final GroupManager groupMgr;
-
- ClusterBuilder(InetSocketAddress address)
- {
- handle = new SimpleMemberHandle(address.getHostName(), address.getPort()).resolve();
- groupMgr = new DefaultGroupManager(handle, getBrokerFactory(), replayStore, loadTable);
- }
-
- GroupManager getGroupManager()
- {
- return groupMgr;
- }
-
- ServerHandlerRegistry getHandlerRegistry()
- {
- return new ServerHandlerRegistry(getHandlerFactory(), null, null);
- }
-
- private MethodHandlerFactory getHandlerFactory()
- {
- MethodHandlerFactory factory = new ClusterMethodHandlerFactory(groupMgr, loadTable);
- //need to wrap relevant handlers with recording handler for easy replay:
- return new RecordingMethodHandlerFactory(factory, replayStore);
- }
-
- private BrokerFactory getBrokerFactory()
- {
- return new MinaBrokerProxyFactory(handle);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
deleted file mode 100644
index 57c48f0611..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class ClusterCapability
-{
- public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*";
- public static final String PEER = "cluster_peer";
-
- public static AMQShortString add(AMQShortString original, MemberHandle identity)
- {
- return original == null ? peer(identity) : new AMQShortString(original + " " + peer(identity));
- }
-
- private static AMQShortString peer(MemberHandle identity)
- {
- return new AMQShortString(PEER + "=" + identity.getDetails());
- }
-
- public static boolean contains(AMQShortString in)
- {
- return in != null; // && in.contains(in);
- }
-
- public static MemberHandle getPeer(AMQShortString in)
- {
- Matcher matcher = Pattern.compile(PATTERN).matcher(in);
- if (matcher.matches())
- {
- return new SimpleMemberHandle(matcher.group(1));
- }
- else
- {
- throw new RuntimeException("Could not find peer in '" + in + "'");
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
deleted file mode 100644
index ee5aa48db9..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-
-import java.net.InetSocketAddress;
-
-public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements InductionBuffer.MessageHandler
-{
- private static final Logger _logger = Logger.getLogger(ClusteredProtocolHandler.class);
- private final InductionBuffer _peerBuffer = new InductionBuffer(this);
- private final InductionBuffer _clientBuffer = new InductionBuffer(this);
- private final GroupManager _groupMgr;
- private final ServerHandlerRegistry _handlers;
-
- public ClusteredProtocolHandler(InetSocketAddress address)
- {
- this(ApplicationRegistry.getInstance(), address);
- }
-
- public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address)
- {
- super(registry);
- ClusterBuilder builder = new ClusterBuilder(address);
- _groupMgr = builder.getGroupManager();
- _handlers = builder.getHandlerRegistry();
- }
-
- public ClusteredProtocolHandler(ClusteredProtocolHandler handler)
- {
- super(handler);
- _groupMgr = handler._groupMgr;
- _handlers = handler._handlers;
- }
-
- protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
- {
- new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession));
- }
-
- void connect(String join) throws Exception
- {
- if (join == null)
- {
- _groupMgr.establish();
- }
- else
- {
- _groupMgr.join(new SimpleMemberHandle(join));
- }
- }
-
- private boolean inState(JoinState state)
- {
- return _groupMgr.getState().equals(state);
- }
-
- public void messageReceived(IoSession session, Object msg) throws Exception
- {
- JoinState state = _groupMgr.getState();
- switch (state)
- {
- case JOINED:
- _logger.debug(new LogMessage("Received {0}", msg));
- super.messageReceived(session, msg);
- break;
- case JOINING:
- case INITIATION:
- case INDUCTION:
- buffer(session, msg);
- break;
- default:
- throw new AMQException("Received message while in state: " + state);
- }
- JoinState latest = _groupMgr.getState();
- if (!latest.equals(state))
- {
- switch (latest)
- {
- case INDUCTION:
- _logger.info("Reached induction, delivering buffered message from peers");
- _peerBuffer.deliver();
- break;
- case JOINED:
- _logger.info("Reached joined, delivering buffered message from clients");
- _clientBuffer.deliver();
- break;
- }
- }
- }
-
- private void buffer(IoSession session, Object msg) throws Exception
- {
- if (isBufferable(msg))
- {
- MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
- if (peer == null)
- {
- _logger.debug(new LogMessage("Buffering {0} for client", msg));
- _clientBuffer.receive(session, msg);
- }
- else if (inState(JoinState.JOINING) && isMembershipAnnouncement(msg))
- {
- _logger.debug(new LogMessage("Initial membership [{0}] received from {1}", msg, peer));
- super.messageReceived(session, msg);
- }
- else if (inState(JoinState.INITIATION) && _groupMgr.isLeader(peer))
- {
- _logger.debug(new LogMessage("Replaying {0} from leader ", msg));
- super.messageReceived(session, msg);
- }
- else if (inState(JoinState.INDUCTION))
- {
- _logger.debug(new LogMessage("Replaying {0} from peer {1}", msg, peer));
- super.messageReceived(session, msg);
- }
- else
- {
- _logger.debug(new LogMessage("Buffering {0} for peer {1}", msg, peer));
- _peerBuffer.receive(session, msg);
- }
- }
- else
- {
- _logger.debug(new LogMessage("Received {0}", msg));
- super.messageReceived(session, msg);
- }
- }
-
- public void deliver(IoSession session, Object msg) throws Exception
- {
- _logger.debug(new LogMessage("Delivering {0}", msg));
- super.messageReceived(session, msg);
- }
-
- private boolean isMembershipAnnouncement(Object msg)
- {
- return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody);
- }
-
- private boolean isBufferable(Object msg)
- {
- return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame());
- }
-
- private boolean isBuffereable(AMQBody body)
- {
- return !(body instanceof ConnectionStartOkBody ||
- body instanceof ConnectionTuneOkBody ||
- body instanceof ConnectionSecureOkBody ||
- body instanceof ConnectionOpenBody);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
deleted file mode 100644
index eea660c4f0..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-
-public class ClusteredProtocolSession extends AMQMinaProtocolSession
-{
- private MemberHandle _peer;
-
- public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
-// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry,
-// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException
- {
- super(session, virtualHostRegistry, codecFactory, stateManager);
-// super(session, queueRegistry, exchangeRegistry, codecFactory);
- }
-
- public boolean isPeerSession()
- {
- return _peer != null;
- }
-
- public void setSessionPeer(MemberHandle peer)
- {
- _peer = peer;
- }
-
- public MemberHandle getSessionPeer()
- {
- return _peer;
- }
-
- public AMQChannel getChannel(int channelId)
- throws AMQException
- {
- AMQChannel channel = super.getChannel(channelId);
- if (isPeerSession() && channel == null)
- {
- channel = new OneUseChannel(channelId, getVirtualHost());
- addChannel(channel);
- }
- return channel;
- }
-
- public static boolean isPeerSession(IoSession session)
- {
- return isPeerSession(getAMQProtocolSession(session));
- }
-
- public static boolean isPeerSession(AMQProtocolSession session)
- {
- return session instanceof ClusteredProtocolSession && ((ClusteredProtocolSession) session).isPeerSession();
- }
-
- public static void setSessionPeer(AMQProtocolSession session, MemberHandle peer)
- {
- ((ClusteredProtocolSession) session).setSessionPeer(peer);
- }
-
- public static MemberHandle getSessionPeer(AMQProtocolSession session)
- {
- return ((ClusteredProtocolSession) session).getSessionPeer();
- }
-
- public static MemberHandle getSessionPeer(IoSession session)
- {
- return getSessionPeer(getAMQProtocolSession(session));
- }
-
- /**
- * Cleans itself up after delivery of a message (publish frame, header and optional body frame(s))
- */
- private class OneUseChannel extends AMQChannel
- {
- public OneUseChannel(int channelId, VirtualHost virtualHost)
- throws AMQException
- {
- super(ClusteredProtocolSession.this,channelId,
- virtualHost.getMessageStore(),
- virtualHost.getExchangeRegistry());
- }
-
- protected void routeCurrentMessage() throws AMQException
- {
- super.routeCurrentMessage();
- removeChannel(getChannelId());
- }
- }
-
- public static boolean isPayloadFromPeer(AMQMessage payload)
- {
- return isPeerSession(payload.getPublisher());
- }
-
- public static boolean canRelay(AMQMessage payload, MemberHandle target)
- {
- //can only relay client messages that have not already been relayed to the given target
- return !isPayloadFromPeer(payload) && !payload.checkToken(target);
- }
-
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
deleted file mode 100644
index a1f01eff46..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import java.io.IOException;
-
-class ConnectionStatusMonitor
-{
- private boolean _complete;
- private boolean _redirected;
- private String _host;
- private int _port;
- private RuntimeException _error;
-
- synchronized void opened()
- {
- _complete = true;
- notifyAll();
- }
-
- synchronized void redirect(String host, int port)
- {
- _complete = true;
- _redirected = true;
- this._host = host;
- this._port = port;
- }
-
- synchronized void failed(RuntimeException e)
- {
- _error = e;
- _complete = true;
- }
-
- synchronized boolean waitUntilOpen() throws InterruptedException
- {
- while (!_complete)
- {
- wait();
- }
- if (_error != null)
- {
- throw _error;
- }
- return !_redirected;
- }
-
- synchronized boolean isOpened()
- {
- return _complete;
- }
-
- String getHost()
- {
- return _host;
- }
-
- int getPort()
- {
- return _port;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
deleted file mode 100644
index 2f473b63fb..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.cluster.policy.StandardPolicies;
-import org.apache.qpid.server.cluster.replay.ReplayManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-
-import java.util.List;
-
-public class DefaultGroupManager implements GroupManager, MemberFailureListener, BrokerFactory, StandardPolicies
-{
- private static final Logger _logger = Logger.getLogger(DefaultGroupManager.class);
- private final LoadTable _loadTable;
- private final BrokerFactory _factory;
- private final ReplayManager _replayMgr;
- private final BrokerGroup _group;
-
- DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr)
- {
- this(handle, factory, replayMgr, new LoadTable());
- }
-
- DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr, LoadTable loadTable)
- {
- handle = SimpleMemberHandle.resolve(handle);
- _logger.info(handle);
- _loadTable = loadTable;
- _factory = factory;
- _replayMgr = replayMgr;
- _group = new BrokerGroup(handle, _replayMgr, this);
- }
-
- public JoinState getState()
- {
- return _group.getState();
- }
-
- public void addMemberhipChangeListener(MembershipChangeListener l)
- {
- _group.addMemberhipChangeListener(l);
- }
-
- public void removeMemberhipChangeListener(MembershipChangeListener l)
- {
- _group.removeMemberhipChangeListener(l);
- }
-
- public void broadcast(Sendable message) throws AMQException
- {
- for (Broker b : _group.getPeers())
- {
- b.send(message, null);
- }
- }
-
- public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException
- {
- GroupRequest request = new GroupRequest(message, policy, callback);
- for (Broker b : _group.getPeers())
- {
- b.invoke(request);
- }
- request.finishedSend();
- }
-
- public void send(MemberHandle broker, Sendable message) throws AMQException
- {
- Broker destination = findBroker(broker);
- if(destination == null)
- {
- _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));
- }
- else
- {
- destination.send(message, null);
- _logger.debug(new LogMessage("Sent {0} to {1}", message, broker));
- }
- }
-
- private void send(Broker broker, Sendable message, ResponseHandler handler) throws AMQException
- {
- broker.send(message, handler);
- }
-
- private void ping(Broker b) throws AMQException
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterPingBody ping = new ClusterPingBody((byte)8,
- (byte)0,
- ClusterPingBody.getClazz((byte)8, (byte)0),
- ClusterPingBody.getMethod((byte)8, (byte)0),
- _group.getLocal().getDetails(),
- _loadTable.getLocalLoad(),
- true);
- BlockingHandler handler = new BlockingHandler();
- send(getLeader(), new SimpleBodySendable(ping), handler);
- handler.waitForCompletion();
- if (handler.failed())
- {
- if (isLeader())
- {
- handleFailure(b);
- }
- else
- {
- suspect(b);
- }
- }
- else
- {
- _loadTable.setLoad(b, ((ClusterPingBody) handler.getResponse()).load);
- }
- }
-
- public void handlePing(MemberHandle member, long load)
- {
- _loadTable.setLoad(findBroker(member), load);
- }
-
- public Member redirect()
- {
- return _loadTable.redirect();
- }
-
- public void establish()
- {
- _group.establish();
- _logger.info("Established cluster");
- }
-
- public void join(MemberHandle member) throws AMQException
- {
- member = SimpleMemberHandle.resolve(member);
-
- Broker leader = connectToLeader(member);
- _logger.info(new LogMessage("Connected to {0}. joining", leader));
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterJoinBody join = new ClusterJoinBody((byte)8,
- (byte)0,
- ClusterJoinBody.getClazz((byte)8, (byte)0),
- ClusterJoinBody.getMethod((byte)8, (byte)0),
- _group.getLocal().getDetails());
-
- send(leader, new SimpleBodySendable(join));
- }
-
- private Broker connectToLeader(MemberHandle member) throws AMQException
- {
- try
- {
- return _group.connectToLeader(member);
- }
- catch (Exception e)
- {
- throw new AMQException("Could not connect to leader: " + e, e);
- }
- }
-
- public void leave() throws AMQException
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterLeaveBody leave = new ClusterLeaveBody((byte)8,
- (byte)0,
- ClusterLeaveBody.getClazz((byte)8, (byte)0),
- ClusterLeaveBody.getMethod((byte)8, (byte)0),
- _group.getLocal().getDetails());
-
- send(getLeader(), new SimpleBodySendable(leave));
- }
-
- private void suspect(MemberHandle broker) throws AMQException
- {
- if (_group.isLeader(broker))
- {
- //need new leader, if this broker is next in line it can assume leadership
- if (_group.assumeLeadership())
- {
- announceMembership();
- }
- else
- {
- _logger.warn(new LogMessage("Leader failed. Expecting {0} to succeed.", _group.getMembers().get(1)));
- }
- }
- else
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8,
- (byte)0,
- ClusterSuspectBody.getClazz((byte)8, (byte)0),
- ClusterSuspectBody.getMethod((byte)8, (byte)0),
- broker.getDetails());
-
- send(getLeader(), new SimpleBodySendable(suspect));
- }
- }
-
-
- public void handleJoin(MemberHandle member) throws AMQException
- {
- _logger.info(new LogMessage("Handling join request for {0}", member));
- if(isLeader())
- {
- //connect to the host and port specified:
- Broker prospect = connectToProspect(member);
- announceMembership();
- List<AMQMethodBody> msgs = _replayMgr.replay(true);
- _logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect));
- prospect.replay(msgs);
- }
- else
- {
- //pass request on to leader:
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0,
- ClusterJoinBody.getClazz((byte)8, (byte)0),
- ClusterJoinBody.getMethod((byte)8, (byte)0),
- member.getDetails());
-
- Broker leader = getLeader();
- send(leader, new SimpleBodySendable(request));
- _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader));
- }
- }
-
- private Broker connectToProspect(MemberHandle member) throws AMQException
- {
- try
- {
- return _group.connectToProspect(member);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- throw new AMQException("Could not connect to prospect: " + e, e);
- }
- }
-
- public void handleLeave(MemberHandle member) throws AMQException
- {
- handleFailure(findBroker(member));
- announceMembership();
- }
-
- public void handleSuspect(MemberHandle member) throws AMQException
- {
- Broker b = findBroker(member);
- if(b != null)
- {
- //ping it to check it has failed, ping will handle failure if it has
- ping(b);
- announceMembership();
- }
- }
-
- public void handleSynch(MemberHandle member)
- {
- _group.synched(member);
- }
-
- private ClusterMembershipBody createAnnouncement(String membership)
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0,
- ClusterMembershipBody.getClazz((byte)8, (byte)0),
- ClusterMembershipBody.getMethod((byte)8, (byte)0),
- membership.getBytes());
-
-
- return announce;
- }
-
- private void announceMembership() throws AMQException
- {
- String membership = SimpleMemberHandle.membersToString(_group.getMembers());
- ClusterMembershipBody announce = createAnnouncement(membership);
- broadcast(new SimpleBodySendable(announce));
- _logger.info(new LogMessage("Membership announcement sent: {0}", membership));
- }
-
- private void handleFailure(Broker peer)
- {
- peer.remove();
- _group.remove(peer);
- }
-
- public void handleMembershipAnnouncement(String membership) throws AMQException
- {
- _group.setMembers(SimpleMemberHandle.stringToMembers(membership));
- _logger.info(new LogMessage("Membership announcement received: {0}", membership));
- }
-
- public boolean isLeader()
- {
- return _group.isLeader();
- }
-
- public boolean isLeader(MemberHandle handle)
- {
- return _group.isLeader(handle);
- }
-
- public Broker getLeader()
- {
- return _group.getLeader();
- }
-
- private Broker findBroker(MemberHandle handle)
- {
- return _group.findBroker(handle, false);
- }
-
- public Member getMember(MemberHandle handle)
- {
- return findBroker(handle);
- }
-
- public boolean isMember(MemberHandle member)
- {
- for (MemberHandle handle : _group.getMembers())
- {
- if (handle.matches(member))
- {
- return true;
- }
- }
- return false;
- }
-
- public MemberHandle getLocal()
- {
- return _group.getLocal();
- }
-
- public void failed(MemberHandle member)
- {
- if (isLeader())
- {
- handleFailure(findBroker(member));
- try
- {
- announceMembership();
- }
- catch (AMQException e)
- {
- _logger.error("Error announcing failure: " + e, e);
- }
- }
- else
- {
- try
- {
- suspect(member);
- }
- catch (AMQException e)
- {
- _logger.error("Error sending suspect: " + e, e);
- }
- }
- }
-
- public Broker create(MemberHandle handle)
- {
- Broker broker = _factory.create(handle);
- broker.addFailureListener(this);
- return broker;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java
deleted file mode 100644
index 5599ae4b1f..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-
-public interface GroupManager
-{
- /**
- * Establish a new cluster with the local member as the leader.
- */
- public void establish();
-
- /**
- * Join the cluster to which member belongs
- */
- public void join(MemberHandle member) throws AMQException;
-
- public void broadcast(Sendable message) throws AMQException;
-
- public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException;
-
- public void send(MemberHandle broker, Sendable message) throws AMQException;
-
- public void leave() throws AMQException;
-
- public void handleJoin(MemberHandle member) throws AMQException;
-
- public void handleLeave(MemberHandle member) throws AMQException;
-
- public void handleSuspect(MemberHandle member) throws AMQException;
-
- public void handlePing(MemberHandle member, long load);
-
- public void handleMembershipAnnouncement(String membership) throws AMQException;
-
- public void handleSynch(MemberHandle member);
-
- public boolean isLeader();
-
- public boolean isLeader(MemberHandle handle);
-
- public boolean isMember(MemberHandle member);
-
- public MemberHandle redirect();
-
- public MemberHandle getLocal();
-
- public JoinState getState();
-
- public void addMemberhipChangeListener(MembershipChangeListener l);
-
- public void removeMemberhipChangeListener(MembershipChangeListener l);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java
deleted file mode 100644
index 8ab7856e87..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Represents a method sent to a group of Member instances. Manages the responses,
- * completion and callback.
- *
- */
-class GroupRequest
-{
- private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>();
- private final List<Member> _brokers = new ArrayList<Member>();
- private boolean _sent;
-
- private final Sendable _request;
- private final BroadcastPolicy _policy;
- private final GroupResponseHandler _callback;
-
- GroupRequest(Sendable request, BroadcastPolicy policy, GroupResponseHandler callback)
- {
- _request = request;
- _policy = policy;
- _callback = callback;
- }
-
- void send(int channel, Member session) throws AMQException
- {
- _brokers.add(session);
- _request.send(channel, session);
- }
-
- boolean finishedSend()
- {
- _sent = true;
- return checkCompletion();
- }
-
- public boolean responseReceived(Member broker, AMQMethodBody response)
- {
- _responses.put(broker, response);
- return checkCompletion();
- }
-
- public boolean removed(Member broker)
- {
- _brokers.remove(broker);
- return checkCompletion();
- }
-
- private synchronized boolean checkCompletion()
- {
- return isComplete() && callback();
- }
-
- boolean isComplete()
- {
- return _sent && _policy != null && _policy.isComplete(_responses.size(), _brokers.size());
- }
-
- boolean callback()
- {
- _callback.response(getResults(), _brokers);
- return true;
- }
-
- List<AMQMethodBody> getResults()
- {
- List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size());
- for (Member b : _brokers)
- {
- results.add(_responses.get(b));
- }
- return results;
- }
-
- public String toString()
- {
- return "GroupRequest{request=" + _request +", brokers=" + _brokers + ", responses=" + _responses + "}";
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java
deleted file mode 100644
index d2e9de2f39..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.util.List;
-
-public interface GroupResponseHandler
-{
- //Note: this implies that the response to a group request will always be a method body...
- public void response(List<AMQMethodBody> responses, List<Member> members);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java
deleted file mode 100644
index 586d7d4ae8..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.mina.common.IoSession;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-/**
- * Buffers any received messages until join completes.
- *
- */
-class InductionBuffer
-{
- private final Queue<Message> _buffer = new LinkedList<Message>();
- private final MessageHandler _handler;
- private boolean _buffering = true;
-
- InductionBuffer(MessageHandler handler)
- {
- _handler = handler;
- }
-
- private void process() throws Exception
- {
- for (Message o = _buffer.poll(); o != null; o = _buffer.poll())
- {
- o.deliver(_handler);
- }
- _buffering = false;
- }
-
- synchronized void deliver() throws Exception
- {
- process();
- }
-
- synchronized void receive(IoSession session, Object msg) throws Exception
- {
- if (_buffering)
- {
- _buffer.offer(new Message(session, msg));
- }
- else
- {
- _handler.deliver(session, msg);
- }
- }
-
- private static class Message
- {
- private final IoSession _session;
- private final Object _msg;
-
- Message(IoSession session, Object msg)
- {
- _session = session;
- _msg = msg;
- }
-
- void deliver(MessageHandler handler) throws Exception
- {
- handler.deliver(_session, _msg);
- }
- }
-
- static interface MessageHandler
- {
- public void deliver(IoSession session, Object msg) throws Exception;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java
deleted file mode 100644
index 5f92aa2971..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-public enum JoinState
-{
- UNINITIALISED, JOINING, INITIATION, INDUCTION, JOINED
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java
deleted file mode 100644
index 13465a8615..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.PriorityQueue;
-
-/**
- * Maintains loading information about the local member and its cluster peers.
- *
- */
-public class LoadTable
-{
- private final Map<MemberHandle, Loading> _peers = new HashMap<MemberHandle, Loading>();
- private final PriorityQueue<Loading> _loads = new PriorityQueue<Loading>();
- private final Loading _local = new Loading(null);
-
- public LoadTable()
- {
- _loads.add(_local);
- }
-
- public void setLoad(Member member, long load)
- {
- synchronized (_peers)
- {
- Loading loading = _peers.get(member);
- if (loading == null)
- {
- loading = new Loading(member);
- synchronized (_loads)
- {
- _loads.add(loading);
- }
- _peers.put(member, loading);
- }
- loading.load = load;
- }
- }
-
- public void incrementLocalLoad()
- {
- synchronized (_local)
- {
- _local.load++;
- }
- }
-
- public void decrementLocalLoad()
- {
- synchronized (_local)
- {
- _local.load--;
- }
- }
-
- public long getLocalLoad()
- {
- synchronized (_local)
- {
- return _local.load;
- }
- }
-
- public Member redirect()
- {
- synchronized (_loads)
- {
- return _loads.peek().member;
- }
- }
-
- private static class Loading implements Comparable
- {
- private final Member member;
- private long load;
-
- Loading(Member member)
- {
- this.member = member;
- }
-
- public int compareTo(Object o)
- {
- return (int) (load - ((Loading) o).load);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
deleted file mode 100644
index 15752353d1..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.ConnectorConfiguration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-/**
- * TODO: This is a cut-and-paste from the original broker Main class. Would be preferrable to make that class more
- * reuseable to avoid all this duplication.
- */
-public class Main extends org.apache.qpid.server.Main
-{
- private static final Logger _logger = Logger.getLogger(Main.class);
-
- protected Main(String[] args)
- {
- super(args);
- }
-
- protected void setOptions(Options otions)
- {
- super.setOptions(options);
-
- //extensions:
- Option join = OptionBuilder.withArgName("join").hasArg().withDescription("Join the specified cluster member. Overrides any value in the config file").
- withLongOpt("join").create("j");
- options.addOption(join);
- }
-
- protected void bind(int port, ConnectorConfiguration connectorConfig)
- {
- try
- {
- IoAcceptor acceptor = new SocketAcceptor();
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
- sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
- sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
- sc.setTcpNoDelay(true);
-
- // if we do not use the executor pool threading model we get the default leader follower
- // implementation provided by MINA
- if (connectorConfig.enableExecutorPool)
- {
- sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- String host = InetAddress.getLocalHost().getHostName();
- ClusteredProtocolHandler handler = new ClusteredProtocolHandler(new InetSocketAddress(host, port));
- if (!connectorConfig.enableSSL)
- {
- acceptor.bind(new InetSocketAddress(port), handler, sconfig);
- _logger.info("Qpid.AMQP listening on non-SSL port " + port);
- handler.connect(commandLine.getOptionValue("j"));
- }
- else
- {
- ClusteredProtocolHandler sslHandler = new ClusteredProtocolHandler(handler);
- acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), sslHandler, sconfig);
- _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
- }
- }
- catch (IOException e)
- {
- _logger.error("Unable to bind service to registry: " + e, e);
- }
- catch (Exception e)
- {
- _logger.error("Unable to connect to cluster: " + e, e);
- }
- }
-
- public static void main(String[] args)
- {
- new Main(args);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java
deleted file mode 100644
index 3fbdfdde70..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-
-public interface Member extends MemberHandle
-{
- public void send(AMQDataBlock data) throws AMQException;
-
- public void addFailureListener(MemberFailureListener listener);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java
deleted file mode 100644
index 7ce45dffaa..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-interface MemberFailureListener
-{
- public void failed(MemberHandle member);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
deleted file mode 100644
index b8099a12f7..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQShortString;
-
-public interface MemberHandle
-{
- public String getHost();
-
- public int getPort();
-
- public boolean matches(MemberHandle m);
-
- public boolean matches(String host, int port);
-
- public AMQShortString getDetails();
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java
deleted file mode 100644
index 591e652e32..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import java.util.List;
-
-public interface MembershipChangeListener
-{
- public void changed(List<MemberHandle> members);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
deleted file mode 100644
index a83f034021..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-
-interface MethodHandler
-{
- public void handle(int channel, AMQMethodBody method) throws AMQException;
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java
deleted file mode 100644
index 9bf04f5458..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.server.state.AMQState;
-
-public interface MethodHandlerFactory
-{
- public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
deleted file mode 100644
index 748a660bb8..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class MethodHandlerRegistry
-{
- private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry =
- new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-
- public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler)
- {
- registry.put(type, handler);
- return this;
- }
-
- public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame)
- {
- return (StateAwareMethodListener<B>) registry.get(frame.getClass());
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
deleted file mode 100644
index b01ec491ec..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionRedirectBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-/**
- * A 'client stub' for a remote cluster peer, using MINA for IO Layer
- *
- */
-public class MinaBrokerProxy extends Broker implements MethodHandler
-{
- private static final Logger _logger = Logger.getLogger(MinaBrokerProxy.class);
- private final ConnectionStatusMonitor _connectionMonitor = new ConnectionStatusMonitor();
- private final ClientHandlerRegistry _legacyHandler;
- private final MinaBinding _binding = new MinaBinding();
- private final MemberHandle _local;
- private IoSession _session;
- private MethodHandler _handler;
- private Iterable<AMQMethodBody> _replay;
-
- MinaBrokerProxy(String host, int port, MemberHandle local)
- {
- super(host, port);
- _local = local;
- _legacyHandler = new ClientHandlerRegistry(local, null);
- }
-
- private void init(IoSession session)
- {
- _session = session;
- _handler = new ClientAdapter(session, _legacyHandler);
- }
-
- private ConnectFuture connectImpl()
- {
- _logger.info("Connecting to cluster peer: " + getDetails());
- SocketConnector ioConnector = new SocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay(true);
- scfg.setSendBufferSize(32768);
- scfg.setReceiveBufferSize(32768);
- InetSocketAddress address = new InetSocketAddress(getHost(), getPort());
- return ioConnector.connect(address, _binding);
- }
-
- //extablish connection without handling redirect
- boolean connect() throws IOException, InterruptedException
- {
- ConnectFuture future = connectImpl();
- // wait for connection to complete
- future.join();
- // we call getSession which throws an IOException if there has been an error connecting
- try
- {
- future.getSession();
- }
- catch (RuntimeIOException e)
- {
- _connectionMonitor.failed(e);
- _logger.error(new LogMessage("Could not connect to {0}: {1}", this, e), e);
- throw e;
- }
- return _connectionMonitor.waitUntilOpen();
- }
-
- void connectAsynch(Iterable<AMQMethodBody> msgs)
- {
- _replay = msgs;
- connectImpl();
- }
-
- void replay(Iterable<AMQMethodBody> msgs)
- {
- _replay = msgs;
- if(_connectionMonitor.isOpened())
- {
- replay();
- }
- }
-
- //establish connection, handling redirect if required...
- Broker connectToCluster() throws IOException, InterruptedException
- {
- connect();
- //wait until the connection is open or get a redirection
- if (_connectionMonitor.waitUntilOpen())
- {
- return this;
- }
- else
- {
- Broker broker = new MinaBrokerProxy(_connectionMonitor.getHost(), _connectionMonitor.getPort(), _local);
- broker.connect();
- return broker;
- }
- }
-
- public void send(AMQDataBlock data) throws AMQConnectionWaitException
- {
- if (_session == null)
- {
- try
- {
- _connectionMonitor.waitUntilOpen();
- }
- catch (InterruptedException e)
- {
- throw new AMQConnectionWaitException("Failed to send " + data + ": " + e, e);
- }
- }
- _session.write(data);
- }
-
- private void replay()
- {
- if(_replay != null)
- {
- for(AMQMethodBody b : _replay)
- {
- _session.write(new AMQFrame(0, b));
- }
- }
- }
-
- public void handle(int channel, AMQMethodBody method) throws AMQException
- {
- _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel));
- if (!handleResponse(channel, method))
- {
- _logger.warn(new LogMessage("Unhandled method: {0} for channel {1}", method, channel));
- }
- }
-
- private void handleMethod(int channel, AMQMethodBody method) throws AMQException
- {
- if (method instanceof ConnectionRedirectBody)
- {
- //signal redirection to waiting thread
- ConnectionRedirectBody redirect = (ConnectionRedirectBody) method;
- String[] parts = redirect.host.toString().split(":");
- _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1]));
- }
- else
- {
- _handler.handle(channel, method);
- if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this)
- {
- _handler = this;
- _logger.info(new LogMessage("Connection opened, handler switched"));
- //replay any messages:
- replay();
- //signal waiting thread:
- _connectionMonitor.opened();
- }
- }
- }
-
- private void handleFrame(AMQFrame frame) throws AMQException
- {
- AMQBody body = frame.getBodyFrame();
- if (body instanceof AMQMethodBody)
- {
- handleMethod(frame.getChannel(), (AMQMethodBody) body);
- }
- else
- {
- throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body);
- }
- }
-
- public String toString()
- {
- return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]";
- }
-
- private class MinaBinding extends IoHandlerAdapter
- {
- public void sessionCreated(IoSession session) throws Exception
- {
- init(session);
- _logger.info(new LogMessage("{0}: created", MinaBrokerProxy.this));
- ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
- session.getFilterChain().addLast("protocolFilter", pcf);
-
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be used
- here. */
-
- session.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
- }
-
- public void sessionOpened(IoSession session) throws Exception
- {
- _logger.info(new LogMessage("{0}: opened", MinaBrokerProxy.this));
- }
-
- public void sessionClosed(IoSession session) throws Exception
- {
- _logger.info(new LogMessage("{0}: closed", MinaBrokerProxy.this));
- }
-
- public void exceptionCaught(IoSession session, Throwable throwable) throws Exception
- {
- _logger.error(new LogMessage("{0}: received {1}", MinaBrokerProxy.this, throwable), throwable);
- if (! (throwable instanceof IOException))
- {
- _session.close();
- }
- failed();
- }
-
- public void messageReceived(IoSession session, Object object) throws Exception
- {
- if (object instanceof AMQFrame)
- {
- handleFrame((AMQFrame) object);
- }
- else
- {
- throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object);
- }
- }
-
- public void messageSent(IoSession session, Object object) throws Exception
- {
- _logger.debug(new LogMessage("{0}: sent {1}", MinaBrokerProxy.this, object));
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
deleted file mode 100644
index 5e70de7665..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-public class MinaBrokerProxyFactory implements BrokerFactory
-{
- private final MemberHandle _local;
-
- MinaBrokerProxyFactory(MemberHandle local)
- {
- _local = local;
- }
-
- public Broker create(MemberHandle handle)
- {
- return new MinaBrokerProxy(handle.getHost(), handle.getPort(), _local);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java
deleted file mode 100644
index fe76ca6505..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-public interface ResponseHandler
-{
- public void responded(AMQMethodBody response);
-
- public void removed();
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java
deleted file mode 100644
index 159612331c..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-
-public interface Sendable
-{
- public void send(int channel, Member member) throws AMQException;
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
deleted file mode 100644
index aadcfa4b4c..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
-//import org.apache.qpid.server.state.IllegalStateTransitionException;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * An extension of server.AMQStateManager that allows different handlers to be registered.
- *
- */
-class ServerHandlerRegistry extends AMQStateManager
-{
- private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
- private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
-
- ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
- {
- super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession);
- }
-
- ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
- {
- this(virtualHostRegistry, protocolSession);
- _handlers.putAll(s._handlers);
- }
-
- ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
- {
- this(virtualHostRegistry, protocolSession);
- init(factory);
- }
-
- void setHandlers(AMQState state, MethodHandlerRegistry handlers)
- {
- _handlers.put(state, handlers);
- }
-
- void init(MethodHandlerFactory factory)
- {
- for (AMQState s : AMQState.values())
- {
- setHandlers(s, factory.register(s, new MethodHandlerRegistry()));
- }
- }
-
- protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) //throws IllegalStateTransitionException
- {
- MethodHandlerRegistry registry = _handlers.get(state);
- StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame);
- if (handler == null)
- {
- _logger.warn(new LogMessage("No handler for {0}, {1}", state, frame));
- }
- return handler;
- }
-
- <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler)
- {
- MethodHandlerRegistry registry = _handlers.get(state);
- if (registry == null)
- {
- registry = new MethodHandlerRegistry();
- _handlers.put(state, registry);
- }
- registry.addHandler(type, handler);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java
deleted file mode 100644
index bd3757bf97..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQFrame;
-
-/**
- */
-public class SimpleBodySendable implements Sendable
-{
- private final AMQBody _body;
-
- public SimpleBodySendable(AMQBody body)
- {
- _body = body;
- }
-
- public void send(int channel, Member member) throws AMQException
- {
- member.send(new AMQFrame(channel, _body));
- }
-
- public String toString()
- {
- return _body.toString();
- }
-
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
deleted file mode 100644
index 1255094b1d..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class SimpleMemberHandle implements MemberHandle
-{
- private final String _host;
- private final int _port;
-
- public SimpleMemberHandle(String host, int port)
- {
- _host = host;
- _port = port;
- }
-
- public SimpleMemberHandle(AMQShortString details)
- {
- this(details.toString());
- }
-
- public SimpleMemberHandle(String details)
- {
- String[] parts = details.split(":");
- _host = parts[0];
- _port = Integer.parseInt(parts[1]);
- }
-
- public SimpleMemberHandle(InetSocketAddress address) throws UnknownHostException
- {
- this(address.getAddress(), address.getPort());
- }
-
- public SimpleMemberHandle(InetAddress address, int port) throws UnknownHostException
- {
- this(canonical(address).getHostAddress(), port);
- }
-
- public String getHost()
- {
- return _host;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- public int hashCode()
- {
- return getPort();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof MemberHandle && matches((MemberHandle) o);
- }
-
- public boolean matches(MemberHandle m)
- {
- return matches(m.getHost(), m.getPort());
- }
-
- public boolean matches(String host, int port)
- {
- return _host.equals(host) && _port == port;
- }
-
- public AMQShortString getDetails()
- {
- return new AMQShortString(_host + ":" + _port);
- }
-
- public String toString()
- {
- return getDetails().toString();
- }
-
- static List<MemberHandle> stringToMembers(String membership)
- {
- String[] names = membership.split("\\s");
- List<MemberHandle> members = new ArrayList<MemberHandle>();
- for (String name : names)
- {
- members.add(new SimpleMemberHandle(name));
- }
- return members;
- }
-
- static String membersToString(List<MemberHandle> members)
- {
- StringBuffer buffer = new StringBuffer();
- boolean first = true;
- for (MemberHandle m : members)
- {
- if (first)
- {
- first = false;
- }
- else
- {
- buffer.append(" ");
- }
- buffer.append(m.getDetails());
- }
-
- return buffer.toString();
- }
-
- private static InetAddress canonical(InetAddress address) throws UnknownHostException
- {
- if (address.isLoopbackAddress())
- {
- return InetAddress.getLocalHost();
- }
- else
- {
- return address;
- }
- }
-
- public MemberHandle resolve()
- {
- return resolve(this);
- }
-
- public static MemberHandle resolve(MemberHandle handle)
- {
- try
- {
- return new SimpleMemberHandle(new InetSocketAddress(handle.getHost(), handle.getPort()));
- }
- catch (UnknownHostException e)
- {
- e.printStackTrace();
- return handle;
- }
- }
-
-
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
deleted file mode 100644
index 3699a9e128..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.amqp_8_0.MethodConverter_8_0;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.server.queue.AMQMessage;
-
-import java.util.Iterator;
-
-public class SimpleSendable implements Sendable
-{
-
- //todo fixme - remove 0-8 hard coding
- ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0();
-
- private final AMQMessage _message;
-
- public SimpleSendable(AMQMessage message)
- {
- _message = message;
- }
-
- public void send(int channel, Member member) throws AMQException
- {
- member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo())));
- member.send(new AMQFrame(channel, _message.getContentHeaderBody()));
- Iterator<ContentChunk> it = _message.getContentBodyIterator();
- while (it.hasNext())
- {
- member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next())));
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
deleted file mode 100644
index 86710e8a31..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.util.List;
-import java.util.ArrayList;
-
-public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A>
-{
- private final List<ClusterMethodHandler<A>> _handlers;
-
- private ChainedClusterMethodHandler()
- {
- this(new ArrayList<ClusterMethodHandler<A>>());
- }
-
- public ChainedClusterMethodHandler(List<ClusterMethodHandler<A>> handlers)
- {
- _handlers = handlers;
- }
-
- public ChainedClusterMethodHandler(ClusterMethodHandler<A>... handlers)
- {
- this();
- for(ClusterMethodHandler<A>handler: handlers)
- {
- _handlers.add(handler);
- }
- }
-
- protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- for(ClusterMethodHandler<A> handler : _handlers)
- {
- handler.peer(stateMgr, evt);
- }
- }
-
- protected final void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- for(ClusterMethodHandler<A> handler : _handlers)
- {
- handler.client(stateMgr, evt);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
deleted file mode 100644
index c9f6dbfb37..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.log4j.Logger;
-
-import java.util.Map;
-import java.util.HashMap;
-
-/**
- * Maintains the default queue names for a channel, and alters subsequent frames where necessary
- * to use this (i.e. when no queue is explictly specified).
- *
- */
-class ChannelQueueManager
-{
- private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class);
- private final Map<Integer, AMQShortString> _channelQueues = new HashMap<Integer, AMQShortString>();
-
- ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler()
- {
- return new QueueDeclareHandler();
- }
-
- ClusterMethodHandler<QueueDeleteBody> createQueueDeleteHandler()
- {
- return new QueueDeleteHandler();
- }
-
- ClusterMethodHandler<QueueBindBody> createQueueBindHandler()
- {
- return new QueueBindHandler();
- }
-
- ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler()
- {
- return new BasicConsumeHandler();
- }
-
- private void set(int channel, AMQShortString queue)
- {
- _channelQueues.put(channel, queue);
- _logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue));
- }
-
- private AMQShortString get(int channel)
- {
- AMQShortString queue = _channelQueues.get(channel);
- _logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue));
- return queue;
- }
-
- private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody>
- {
- protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
- {
- set(evt.getChannelId(), evt.getMethod().queue);
- }
- }
- private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody>
- {
- protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
- {
- if(evt.getMethod().queue == null)
- {
- evt.getMethod().queue = get(evt.getChannelId());
- }
- }
- }
- private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody>
- {
- protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
- {
- if(evt.getMethod().queue == null)
- {
- evt.getMethod().queue = get(evt.getChannelId());
- }
- }
- }
-
- private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody>
- {
- protected void peer(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
- {
- if(evt.getMethod().queue == null)
- {
- evt.getMethod().queue = get(evt.getChannelId());
- }
- }
- }
-
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
deleted file mode 100644
index faab99b0f6..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.AMQException;
-
-public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
-{
- public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- AMQProtocolSession session = stateMgr.getProtocolSession();
-
- if (ClusteredProtocolSession.isPeerSession(session))
- {
- peer(stateMgr, evt);
- }
- else
- {
- client(stateMgr, evt);
- }
- }
-
- protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
- protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
deleted file mode 100644
index e7509da32a..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.cluster.ClusterCapability;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.LoadTable;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.MethodHandlerFactory;
-import org.apache.qpid.server.cluster.MethodHandlerRegistry;
-import org.apache.qpid.server.cluster.SimpleMemberHandle;
-import org.apache.qpid.server.handler.ChannelCloseHandler;
-import org.apache.qpid.server.handler.ChannelFlowHandler;
-import org.apache.qpid.server.handler.ChannelOpenHandler;
-import org.apache.qpid.server.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.server.handler.ConnectionOpenMethodHandler;
-import org.apache.qpid.server.handler.ConnectionSecureOkMethodHandler;
-import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler;
-import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler;
-import org.apache.qpid.server.handler.ExchangeDeclareHandler;
-import org.apache.qpid.server.handler.ExchangeDeleteHandler;
-import org.apache.qpid.server.handler.BasicCancelMethodHandler;
-import org.apache.qpid.server.handler.BasicPublishMethodHandler;
-import org.apache.qpid.server.handler.QueueBindHandler;
-import org.apache.qpid.server.handler.QueueDeleteHandler;
-import org.apache.qpid.server.handler.BasicQosHandler;
-import org.apache.qpid.server.handler.TxSelectHandler;
-import org.apache.qpid.server.handler.TxCommitHandler;
-import org.apache.qpid.server.handler.TxRollbackHandler;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public class ClusterMethodHandlerFactory implements MethodHandlerFactory
-{
- private final GroupManager _groupMgr;
- private final LoadTable _loadTable;
-
- public ClusterMethodHandlerFactory(GroupManager groupMgr, LoadTable loadTable)
- {
- _groupMgr = groupMgr;
- _loadTable = loadTable;
- }
-
- public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry)
- {
- switch (state)
- {
- case CONNECTION_NOT_STARTED:
- return registry.addHandler(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance());
- case CONNECTION_NOT_AUTH:
- return registry.addHandler(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance());
- case CONNECTION_NOT_TUNED:
- return registry.addHandler(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance());
- case CONNECTION_NOT_OPENED:
- //connection.open override:
- return registry.addHandler(ConnectionOpenBody.class, new ConnectionOpenHandler());
- case CONNECTION_OPEN:
- return registerConnectionOpened(registry);
- }
- return registry;
- }
-
- private MethodHandlerRegistry registerConnectionOpened(MethodHandlerRegistry registry)
- {
- //new cluster method handlers:
- registry.addHandler(ClusterJoinBody.class, new JoinHandler());
- registry.addHandler(ClusterLeaveBody.class, new LeaveHandler());
- registry.addHandler(ClusterSuspectBody.class, new SuspectHandler());
- registry.addHandler(ClusterMembershipBody.class, new MembershipHandler());
- registry.addHandler(ClusterPingBody.class, new PingHandler());
- registry.addHandler(ClusterSynchBody.class, new SynchHandler());
-
- //connection.close override:
- registry.addHandler(ConnectionCloseBody.class, new ConnectionCloseHandler());
-
- //replicated handlers:
- registry.addHandler(ExchangeDeclareBody.class, replicated(ExchangeDeclareHandler.getInstance()));
- registry.addHandler(ExchangeDeleteBody.class, replicated(ExchangeDeleteHandler.getInstance()));
-
- ChannelQueueManager channelQueueMgr = new ChannelQueueManager();
-
-
- LocalQueueDeclareHandler handler = new LocalQueueDeclareHandler(_groupMgr);
- registry.addHandler(QueueDeclareBody.class,
- chain(new QueueNameGenerator(handler),
- channelQueueMgr.createQueueDeclareHandler(),
- new ReplicatingHandler<QueueDeclareBody>(_groupMgr, handler)));
-
- registry.addHandler(QueueBindBody.class, chain(channelQueueMgr.createQueueBindHandler(), replicated(QueueBindHandler.getInstance())));
- registry.addHandler(QueueDeleteBody.class, chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new QueueDeleteHandler(false), new QueueDeleteHandler(true)))));
- registry.addHandler(BasicConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr)));
-
- //other modified handlers:
- registry.addHandler(BasicCancelBody.class, alternate(new RemoteCancelHandler(), BasicCancelMethodHandler.getInstance()));
-
- //other unaffected handlers:
- registry.addHandler(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
- registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance());
- registry.addHandler(ChannelOpenBody.class, ChannelOpenHandler.getInstance());
- registry.addHandler(ChannelCloseBody.class, ChannelCloseHandler.getInstance());
- registry.addHandler(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
- registry.addHandler(TxSelectBody.class, TxSelectHandler.getInstance());
- registry.addHandler(TxCommitBody.class, TxCommitHandler.getInstance());
- registry.addHandler(TxRollbackBody.class, TxRollbackHandler.getInstance());
-
-
- return registry;
- }
-
- private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody>
- {
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
- {
- _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession()));
- }
- }
-
- private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody>
- {
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
- {
- _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker));
- }
- }
-
- private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody>
- {
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
- {
- _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker));
- }
- }
-
- private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody>
- {
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
- {
- _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker));
- }
- }
-
- private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody>
- {
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
- {
- ClusterMembershipBody body = evt.getMethod();
- _groupMgr.handleMembershipAnnouncement(new String(body.members));
- }
- }
-
- private class PingHandler implements StateAwareMethodListener<ClusterPingBody>
- {
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException
- {
- MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker);
- _groupMgr.handlePing(peer, evt.getMethod().load);
- if (evt.getMethod().responseRequired)
- {
- evt.getMethod().load = _loadTable.getLocalLoad();
- stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
- }
- }
- }
-
- private class ConnectionOpenHandler extends ExtendedHandler<ConnectionOpenBody>
- {
- ConnectionOpenHandler()
- {
- super(ConnectionOpenMethodHandler.getInstance());
- }
-
- void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt)
- {
- AMQShortString capabilities = evt.getMethod().capabilities;
- if (ClusterCapability.contains(capabilities))
- {
- ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities));
- }
- else
- {
- _loadTable.incrementLocalLoad();
- }
- }
- }
-
- private class ConnectionCloseHandler extends ExtendedHandler<ConnectionCloseBody>
- {
- ConnectionCloseHandler()
- {
- super(ConnectionCloseMethodHandler.getInstance());
- }
-
- void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt)
- {
- if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession()))
- {
- _loadTable.decrementLocalLoad();
- }
- }
- }
-
- private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler)
- {
- return new ReplicatingHandler<B>(_groupMgr, handler);
- }
-
- private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client)
- {
- return new PeerHandler<B>(peer, client);
- }
-
- private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h)
- {
- return new ChainedClusterMethodHandler<B>(h);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
deleted file mode 100644
index a2f62f714b..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
-{
- private final StateAwareMethodListener<A> _base;
-
- ExtendedHandler(StateAwareMethodListener<A> base)
- {
- _base = base;
- }
-
- public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- preHandle(stateMgr, evt);
- _base.methodReceived(stateMgr, evt);
- postHandle(stateMgr, evt);
- }
-
- void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- }
-
- void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java
deleted file mode 100644
index 0dc7fe00d2..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-public abstract class HandlerUtils
-{
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
deleted file mode 100644
index f01a8349f2..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.handler.QueueDeclareHandler;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.ClusteredQueue;
-import org.apache.qpid.server.queue.PrivateQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.RemoteQueueProxy;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class LocalQueueDeclareHandler extends QueueDeclareHandler
-{
- private static final Logger _logger = Logger.getLogger(LocalQueueDeclareHandler.class);
- private final GroupManager _groupMgr;
-
- LocalQueueDeclareHandler(GroupManager groupMgr)
- {
- _groupMgr = groupMgr;
- }
-
- protected AMQShortString createName()
- {
- return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails());
- }
-
- protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException
- {
- //is it private or shared:
- if (body.exclusive)
- {
- if (ClusteredProtocolSession.isPeerSession(session))
- {
- //need to get peer from the session...
- MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
- _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer));
- return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost);
- }
- else
- {
- _logger.debug(new LogMessage("Creating local private queue {0}", body.queue));
- return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost);
- }
- }
- else
- {
- _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue));
- return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
deleted file mode 100644
index 8b0bb4b127..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
-{
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException
- {
- }
-}
-
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
deleted file mode 100644
index 447e51ccd9..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-/**
- * Base for implementing handlers that carry out different actions based on whether the method they
- * are handling was sent by a peer (i.e. another broker in the cluster) or a client (i.e. an end-user
- * application).
- *
- */
-public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A>
-{
- private final StateAwareMethodListener<A> _peer;
- private final StateAwareMethodListener<A> _client;
-
- PeerHandler(StateAwareMethodListener<A> peer, StateAwareMethodListener<A> client)
- {
- _peer = peer;
- _client = client;
- }
-
- protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- _peer.methodReceived(stateMgr, evt);
- }
-
- protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- _client.methodReceived(stateMgr, evt);
- }
-
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
deleted file mode 100644
index a669171d3c..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-
-/**
- * Generates queue names for queues declared with no name.
- *
- */
-class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody>
-{
- private final LocalQueueDeclareHandler _handler;
-
- QueueNameGenerator(LocalQueueDeclareHandler handler)
- {
- _handler = handler;
- }
-
- protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt)
- throws AMQException
- {
- setName(evt.getMethod());//need to set the name before propagating this method
- }
-
- protected void setName(QueueDeclareBody body)
- {
- if (body.queue == null)
- {
- body.queue = _handler.createName();
- }
- }
-}
-
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
deleted file mode 100644
index f09763e1ad..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.ClusteredQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody>
-{
- private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
-
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
-
- //By convention, consumers setup between brokers use the queue name as the consumer tag:
- AMQQueue queue = queueRegistry.getQueue(evt.getMethod().consumerTag);
- if (queue instanceof ClusteredQueue)
- {
- ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));
- }
- else
- {
- _logger.warn("Got remote cancel request for non-clustered queue: " + queue);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
deleted file mode 100644
index 073b13688c..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.ClusteredQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-/**
- * Handles consume requests from other cluster members.
- *
- */
-public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsumeBody>
-{
- private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class);
-
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
- AMQQueue queue = queueRegistry.getQueue(evt.getMethod().queue);
- if (queue instanceof ClusteredQueue)
- {
- ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- evt.getMethod().queue // consumerTag
- ));
- }
- else
- {
- _logger.warn("Got remote consume request for non-clustered queue: " + queue);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
deleted file mode 100644
index 897f8e4fb7..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody>
-{
- ReplicatingConsumeHandler(GroupManager groupMgr)
- {
- this(groupMgr, null);
- }
-
- ReplicatingConsumeHandler(GroupManager groupMgr, BroadcastPolicy policy)
- {
- super(groupMgr, base(), policy);
- }
-
- protected void replicate(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
- //only replicate if the queue in question is a shared queue
- if (isShared(queueRegistry.getQueue(evt.getMethod().queue)))
- {
- super.replicate(stateManager, evt);
- }
- else
- {
- _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod()));
- local(stateManager, evt);
- _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod()));
-
- }
- }
-
- protected boolean isShared(AMQQueue queue)
- {
- return queue != null && queue.isShared();
- }
-
- static StateAwareMethodListener<BasicConsumeBody> base()
- {
- return new PeerHandler<BasicConsumeBody>(peer(), client());
- }
-
- static StateAwareMethodListener<BasicConsumeBody> peer()
- {
- return new RemoteConsumeHandler();
- }
-
- static StateAwareMethodListener<BasicConsumeBody> client()
- {
- return BasicConsumeMethodHandler.getInstance();
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
deleted file mode 100644
index 888fa4e426..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.*;
-import org.apache.qpid.server.cluster.policy.StandardPolicies;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.List;
-
-/**
- * Basic template for handling methods that should be broadcast to the group and
- * processed locally after 'completion' of this broadcast.
- *
- */
-class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> implements StandardPolicies
-{
- protected static final Logger _logger = Logger.getLogger(ReplicatingHandler.class);
-
- private final StateAwareMethodListener<A> _base;
- private final GroupManager _groupMgr;
- private final BroadcastPolicy _policy;
-
- ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base)
- {
- this(groupMgr, base, null);
- }
-
- ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base, BroadcastPolicy policy)
- {
- _groupMgr = groupMgr;
- _base = base;
- _policy = policy;
- }
-
- protected void peer(AMQStateManager stateManager, AMQMethodEvent<A> evt) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
- local(stateManager, evt);
- _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod()));
- }
-
- protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- replicate(stateMgr, evt);
- }
-
- protected void replicate(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- if (_policy == null)
- {
- //asynch delivery
- _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()));
- local(stateMgr, evt);
- }
- else
- {
- Callback callback = new Callback(stateMgr, evt);
- _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback);
- }
- _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod()));
- }
-
- protected void local(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
- {
- _base.methodReceived(stateMgr, evt);
- }
-
- private class Callback implements GroupResponseHandler
- {
- private final AMQStateManager _stateMgr;
- private final AMQMethodEvent<A> _evt;
-
- Callback(AMQStateManager stateMgr, AMQMethodEvent<A> evt)
- {
- _stateMgr = stateMgr;
- _evt = evt;
- }
-
- public void response(List<AMQMethodBody> responses, List<Member> members)
- {
- try
- {
- local(_stateMgr, _evt);
- _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod()));
- }
- catch (AMQException e)
- {
- _logger.error(new LogMessage("Error handling {0}:{1}", _evt.getMethod(), e), e);
- }
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
deleted file mode 100644
index 8b0c638d63..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public class WrappedListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
-{
- private final StateAwareMethodListener<T> _primary;
- private final StateAwareMethodListener _post;
- private final StateAwareMethodListener _pre;
-
- WrappedListener(StateAwareMethodListener<T> primary, StateAwareMethodListener pre, StateAwareMethodListener post)
- {
- _pre = check(pre);
- _post = check(post);
- _primary = check(primary);
- }
-
- public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<T> evt) throws AMQException
- {
- _pre.methodReceived(stateMgr, evt);
- _primary.methodReceived(stateMgr, evt);
- _post.methodReceived(stateMgr, evt);
- }
-
- private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in)
- {
- return in == null ? new NullListener<T>() : in;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
deleted file mode 100644
index 5ec3c9660a..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.MethodHandlerFactory;
-import org.apache.qpid.server.cluster.MethodHandlerRegistry;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public abstract class WrappingMethodHandlerFactory implements MethodHandlerFactory
-{
- private final MethodHandlerFactory _delegate;
- private final StateAwareMethodListener _pre;
- private final StateAwareMethodListener _post;
-
- protected WrappingMethodHandlerFactory(MethodHandlerFactory delegate,
- StateAwareMethodListener pre,
- StateAwareMethodListener post)
- {
- _delegate = delegate;
- _pre = pre;
- _post = post;
- }
-
- public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry)
- {
- if (isWrappableState(state))
- {
- return wrap(_delegate.register(state, registry), state);
- }
- else
- {
- return _delegate.register(state, registry);
- }
- }
-
- protected abstract boolean isWrappableState(AMQState state);
-
- protected abstract Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state);
-
- private MethodHandlerRegistry wrap(MethodHandlerRegistry registry, AMQState state)
- {
- for (FrameDescriptor fd : getWrappableFrameTypes(state))
- {
- wrap(registry, fd.type, fd.instance);
- }
- return registry;
- }
-
- private <A extends AMQMethodBody, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame)
- {
- r.addHandler(type, new WrappedListener<A>(r.getHandler(frame), _pre, _post));
- }
-
- protected static class FrameDescriptor<A extends AMQMethodBody, B extends Class<A>>
- {
- protected final A instance;
- protected final B type;
-
- public FrameDescriptor(B type, A instance)
- {
- this.instance = instance;
- this.type = type;
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
deleted file mode 100644
index 79cb558ede..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public class AsynchBroadcastPolicy implements BroadcastPolicy
-{
- public boolean isComplete(int responded, int members)
- {
- return true;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
deleted file mode 100644
index 42382c6e7a..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public class MajorityResponseBroadcastPolicy implements BroadcastPolicy
-{
- public boolean isComplete(int responded, int members)
- {
- return responded > members / 2;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
deleted file mode 100644
index e3072a6a40..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public class OneResponseBroadcastPolicy implements BroadcastPolicy
-{
- public boolean isComplete(int responded, int members)
- {
- return responded > 0;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java
deleted file mode 100644
index dbaf690d3a..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public interface StandardPolicies
-{
- public static final BroadcastPolicy ASYNCH_POLICY = new AsynchBroadcastPolicy();
- public static final BroadcastPolicy SYNCH_POLICY = new SynchBroadcastPolicy();
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
deleted file mode 100644
index 605b8dd51e..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public class SynchBroadcastPolicy implements BroadcastPolicy
-{
- public boolean isComplete(int responded, int members)
- {
- return responded == members;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
deleted file mode 100644
index 3664be58bc..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-abstract class ChainedMethodRecorder <T extends AMQMethodBody> implements MethodRecorder<T>
-{
- private final MethodRecorder<T> _recorder;
-
- ChainedMethodRecorder()
- {
- this(null);
- }
-
- ChainedMethodRecorder(MethodRecorder<T> recorder)
- {
- _recorder = recorder;
- }
-
- public final void record(T method)
- {
- if(!doRecord(method) && _recorder != null)
- {
- _recorder.record(method);
- }
- }
-
- protected abstract boolean doRecord(T method);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
deleted file mode 100644
index 5a433b869b..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.AMQShortString;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-
-class ConsumerCounts
-{
- private final Map<AMQShortString, Integer> _counts = new HashMap<AMQShortString, Integer>();
-
- synchronized void increment(AMQShortString queue)
- {
- _counts.put(queue, get(queue) + 1);
- }
-
- synchronized void decrement(AMQShortString queue)
- {
- _counts.put(queue, get(queue) - 1);
- }
-
- private int get(AMQShortString queue)
- {
- Integer count = _counts.get(queue);
- return count == null ? 0 : count;
- }
-
- synchronized void replay(List<AMQMethodBody> messages)
- {
- for(AMQShortString queue : _counts.keySet())
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- BasicConsumeBody m = new BasicConsumeBody((byte)8,
- (byte)0,
- BasicConsumeBody.getClazz((byte)8, (byte)0),
- BasicConsumeBody.getMethod((byte)8, (byte)0),
- null,
- queue,
- false,
- false,
- false,
- false,
- queue,
- 0);
- m.queue = queue;
- m.consumerTag = queue;
- replay(m, messages);
- }
- }
-
- private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages)
- {
- int count = _counts.get(msg.queue);
- for(int i = 0; i < count; i++)
- {
- messages.add(msg);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java
deleted file mode 100644
index e45810438e..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-/**
- * Abstraction through which a method can be recorded for replay
- *
- */
-interface MethodRecorder<T extends AMQMethodBody>
-{
- public void record(T method);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
deleted file mode 100644
index 4d3fe1dbed..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.server.cluster.MethodHandlerFactory;
-import org.apache.qpid.server.cluster.MethodHandlerRegistry;
-import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-import java.util.Arrays;
-
-public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory
-{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- private final byte major = (byte)8;
- private final byte minor = (byte)0;
- private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
- {
- new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor), QueueDeclareBody.getMethod(major, minor),null,false,false,false,false,false,null,0)),
- new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor), QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)),
- new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor), QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)),
- new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor), ExchangeDeclareBody.getMethod(major, minor),null,false,false,null,false,false,false,0,null)),
- new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor), ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)),
- new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor), BasicConsumeBody.getMethod(major, minor),null,null,false,false,false,false,null,0)),
- new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor), BasicCancelBody.getMethod(major, minor),null,false))
- });
-
-
- public RecordingMethodHandlerFactory(MethodHandlerFactory factory, ReplayStore store)
- {
- super(factory, null, store);
- }
-
- protected boolean isWrappableState(AMQState state)
- {
- return AMQState.CONNECTION_OPEN.equals(state);
- }
-
- protected Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state)
- {
- return _frames;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java
deleted file mode 100644
index 898cb80cb3..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.server.cluster.Sendable;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.util.List;
-
-/**
- * Abstraction of a replay strategy for use in getting joining members up to
- * date with respect to cluster state.
- *
- */
-public interface ReplayManager
-{
- public List<AMQMethodBody> replay(boolean isLeader);
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
deleted file mode 100644
index d7bbb1c36b..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.util.Bindings;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Stores method invocations for replay to new members.
- *
- */
-public class ReplayStore implements ReplayManager, StateAwareMethodListener
-{
- private static final Logger _logger = Logger.getLogger(ReplayStore.class);
-
- private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
- private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
- private final Map<AMQShortString, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>();
- private final Map<AMQShortString, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>();
- private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _sharedBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>();
- private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _privateBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>();
- private final Map<AMQShortString, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<AMQShortString, ExchangeDeclareBody>();
- private final ConsumerCounts _consumers = new ConsumerCounts();
-
- public ReplayStore()
- {
- _globalRecorders.put(QueueDeclareBody.class, new SharedQueueDeclareRecorder());
- _globalRecorders.put(QueueDeleteBody.class, new SharedQueueDeleteRecorder());
- _globalRecorders.put(QueueBindBody.class, new SharedQueueBindRecorder());
- _globalRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder());
- _globalRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
-
- _localRecorders.put(QueueDeclareBody.class, new PrivateQueueDeclareRecorder());
- _localRecorders.put(QueueDeleteBody.class, new PrivateQueueDeleteRecorder());
- _localRecorders.put(QueueBindBody.class, new PrivateQueueBindRecorder());
- _localRecorders.put(BasicConsumeBody.class, new BasicConsumeRecorder());
- _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder());
- _localRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder());
- _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
- }
-
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
-
- _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod()));
- AMQMethodBody request = evt.getMethod();
-
- //allow any (relevant) recorder registered for this type of request to record it:
- MethodRecorder recorder = getRecorders(session).get(request.getClass());
- if (recorder != null)
- {
- recorder.record(request);
- }
- }
-
- private Map<Class<? extends AMQMethodBody>, MethodRecorder> getRecorders(AMQProtocolSession session)
- {
- if (ClusteredProtocolSession.isPeerSession(session))
- {
- return _globalRecorders;
- }
- else
- {
- return _localRecorders;
- }
- }
-
- public List<AMQMethodBody> replay(boolean isLeader)
- {
- List<AMQMethodBody> methods = new ArrayList<AMQMethodBody>();
- methods.addAll(_exchanges.values());
- methods.addAll(_privateQueues.values());
- synchronized(_privateBindings)
- {
- methods.addAll(_privateBindings.values());
- }
- if (isLeader)
- {
- methods.addAll(_sharedQueues.values());
- synchronized(_sharedBindings)
- {
- methods.addAll(_sharedBindings.values());
- }
- }
- _consumers.replay(methods);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- methods.add(new ClusterSynchBody((byte)8, (byte)0, ClusterSynchBody.getClazz((byte)8, (byte)0), ClusterSynchBody.getMethod((byte)8, (byte)0)));
- return methods;
- }
-
- private class BasicConsumeRecorder implements MethodRecorder<BasicConsumeBody>
- {
- public void record(BasicConsumeBody method)
- {
- if(_sharedQueues.containsKey(method.queue))
- {
- _consumers.increment(method.queue);
- }
- }
- }
-
- private class BasicCancelRecorder implements MethodRecorder<BasicCancelBody>
- {
- public void record(BasicCancelBody method)
- {
- if(_sharedQueues.containsKey(method.consumerTag))
- {
- _consumers.decrement(method.consumerTag);
- }
- }
- }
-
- private class SharedQueueDeclareRecorder extends QueueDeclareRecorder
- {
- SharedQueueDeclareRecorder()
- {
- super(false, _sharedQueues);
- }
- }
-
- private class PrivateQueueDeclareRecorder extends QueueDeclareRecorder
- {
- PrivateQueueDeclareRecorder()
- {
- super(true, _privateQueues, new SharedQueueDeclareRecorder());
- }
- }
-
- private class SharedQueueDeleteRecorder extends QueueDeleteRecorder
- {
- SharedQueueDeleteRecorder()
- {
- super(_sharedQueues, _sharedBindings);
- }
- }
-
- private class PrivateQueueDeleteRecorder extends QueueDeleteRecorder
- {
- PrivateQueueDeleteRecorder()
- {
- super(_privateQueues, _privateBindings, new SharedQueueDeleteRecorder());
- }
- }
-
- private class SharedQueueBindRecorder extends QueueBindRecorder
- {
- SharedQueueBindRecorder()
- {
- super(_sharedQueues, _sharedBindings);
- }
- }
-
- private class PrivateQueueBindRecorder extends QueueBindRecorder
- {
- PrivateQueueBindRecorder()
- {
- super(_privateQueues, _privateBindings, new SharedQueueBindRecorder());
- }
- }
-
-
- private static class QueueDeclareRecorder extends ChainedMethodRecorder<QueueDeclareBody>
- {
- private final boolean _exclusive;
- private final Map<AMQShortString, QueueDeclareBody> _queues;
-
- QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues)
- {
- _queues = queues;
- _exclusive = exclusive;
- }
-
- QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues, QueueDeclareRecorder recorder)
- {
- super(recorder);
- _queues = queues;
- _exclusive = exclusive;
- }
-
-
- protected boolean doRecord(QueueDeclareBody method)
- {
- if (_exclusive == method.exclusive)
- {
- _queues.put(method.queue, method);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private class QueueDeleteRecorder extends ChainedMethodRecorder<QueueDeleteBody>
- {
- private final Map<AMQShortString, QueueDeclareBody> _queues;
- private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings;
-
- QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings)
- {
- this(queues, bindings, null);
- }
-
- QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueDeleteRecorder recorder)
- {
- super(recorder);
- _queues = queues;
- _bindings = bindings;
- }
-
- protected boolean doRecord(QueueDeleteBody method)
- {
- if (_queues.remove(method.queue) != null)
- {
- _bindings.unbind1(method.queue);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private class QueueBindRecorder extends ChainedMethodRecorder<QueueBindBody>
- {
- private final Map<AMQShortString, QueueDeclareBody> _queues;
- private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings;
-
- QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings)
- {
- _queues = queues;
- _bindings = bindings;
- }
-
- QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueBindRecorder recorder)
- {
- super(recorder);
- _queues = queues;
- _bindings = bindings;
- }
-
- protected boolean doRecord(QueueBindBody method)
- {
- if (_queues.containsKey(method.queue))
- {
- _bindings.bind(method.queue, method.exchange, method);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private class ExchangeDeclareRecorder implements MethodRecorder<ExchangeDeclareBody>
- {
- public void record(ExchangeDeclareBody method)
- {
- _exchanges.put(method.exchange, method);
- }
- }
-
- private class ExchangeDeleteRecorder implements MethodRecorder<ExchangeDeleteBody>
- {
- public void record(ExchangeDeleteBody method)
- {
- _exchanges.remove(method.exchange);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java
deleted file mode 100644
index 49de0a7cbf..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.util;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-
-/**
- * Maps two separate keys to a list of values.
- *
- */
-public class Bindings<K1, K2, V>
-{
- private final MultiValuedMap<K1, Binding<K2>> _a = new MultiValuedMap<K1, Binding<K2>>();
- private final MultiValuedMap<K2, Binding<K1>> _b = new MultiValuedMap<K2, Binding<K1>>();
- private final Collection<V> _values = new HashSet<V>();
-
- public void bind(K1 key1, K2 key2, V value)
- {
- _a.add(key1, new Binding<K2>(key2, value));
- _b.add(key2, new Binding<K1>(key1, value));
- _values.add(value);
- }
-
- public void unbind1(K1 key1)
- {
- Collection<Binding<K2>> values = _a.remove(key1);
- for (Binding<K2> v : values)
- {
- _b.remove(v.key);
- _values.remove(v.value);
- }
- }
-
- public void unbind2(K2 key2)
- {
- Collection<Binding<K1>> values = _b.remove(key2);
- for (Binding<K1> v : values)
- {
- _a.remove(v.key);
- _values.remove(v.value);
- }
- }
-
- public Collection<V> values()
- {
- return Collections.unmodifiableCollection(_values);
- }
-
- /**
- * Value needs to hold key to the other map
- */
- private class Binding<T>
- {
- private final T key;
- private final V value;
-
- Binding(T key, V value)
- {
- this.key = key;
- this.value = value;
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java
deleted file mode 100644
index 406fe45701..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.util;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Set;
-import java.util.HashSet;
-
-/**
- * Allows a method to be invoked on a list of listeners with one call
- *
- */
-public class InvokeMultiple <T> implements InvocationHandler
-{
- private final Set<T> _targets = new HashSet<T>();
- private final T _proxy;
-
- public InvokeMultiple(Class<? extends T> type)
- {
- _proxy = (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[]{type}, this);
- }
-
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
- {
- Set<T> targets;
- synchronized(this)
- {
- targets = new HashSet<T>(_targets);
- }
-
- for(T target : targets)
- {
- method.invoke(target, args);
- }
- return null;
- }
-
- public synchronized void addListener(T t)
- {
- _targets.add(t);
- }
-
- public synchronized void removeListener(T t)
- {
- _targets.remove(t);
- }
-
- public T getProxy()
- {
- return _proxy;
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java
deleted file mode 100644
index 9be90298ea..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.util;
-
-import java.text.MessageFormat;
-
-/**
- * Convenience class to allow log messages to be specified in terms
- * of MessageFormat patterns with a variable set of parameters. The
- * production of the string is only done if toSTring is called so it
- * works well with debug level messages, allowing complex messages
- * to be specified that are only evaluated if actually printed.
- *
- */
-public class LogMessage
-{
- private final String _message;
- private final Object[] _args;
-
- public LogMessage(String message)
- {
- this(message, new Object[0]);
- }
-
- public LogMessage(String message, Object... args)
- {
- _message = message;
- _args = args;
- }
-
- public String toString()
- {
- return MessageFormat.format(_message, _args);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java
deleted file mode 100644
index ebe1fe47dd..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.util;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Maps a key to a collection of values
- *
- */
-public class MultiValuedMap<K, V>
-{
- private Map<K, Collection<V>> _map = new HashMap<K, Collection<V>>();
-
- public boolean add(K key, V value)
- {
- Collection<V> values = get(key);
- if (values == null)
- {
- values = createList();
- _map.put(key, values);
- }
- return values.add(value);
- }
-
- public Collection<V> get(K key)
- {
- return _map.get(key);
- }
-
- public Collection<V> remove(K key)
- {
- return _map.remove(key);
- }
-
- protected Collection<V> createList()
- {
- return new ArrayList<V>();
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
deleted file mode 100644
index 9fa96ece1e..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.cluster.*;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Represents a shared queue in a cluster. The key difference is that as well as any
- * local consumers, there may be consumers for this queue on other members of the
- * cluster.
- *
- */
-public class ClusteredQueue extends AMQQueue
-{
- private static final Logger _logger = Logger.getLogger(ClusteredQueue.class);
- private final ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl> _peers = new ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl>();
- private final GroupManager _groupMgr;
- private final NestedSubscriptionManager _subscriptions;
-
- public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, virtualHost, new ClusteredSubscriptionManager());
- _groupMgr = groupMgr;
- _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
- }
-
-
- public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
- {
- _logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this));
- super.process(storeContext, msg, deliverFirst);
- }
-
- protected void autodelete() throws AMQException
- {
- if(!_subscriptions.hasActiveSubscribers())
- {
- //delete locally:
- delete();
-
- //send deletion request to all other members:
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- QueueDeleteBody request = new QueueDeleteBody((byte)8,
- (byte)0,
- QueueDeleteBody.getClazz((byte)8,(byte)0),
- QueueDeleteBody.getMethod((byte)8,(byte)0),
- false,
- false,
- false,
- getName(),
- 0);
-
- _groupMgr.broadcast(new SimpleBodySendable(request));
- }
- }
-
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
- {
- //handle locally:
- super.unregisterProtocolSession(ps, channel, consumerTag);
-
- //signal other members:
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- BasicCancelBody request = new BasicCancelBody((byte)8,
- (byte)0,
- BasicCancelBody.getClazz((byte)8, (byte)0),
- BasicCancelBody.getMethod((byte)8, (byte)0),
- getName(),
- false);
-
- _groupMgr.broadcast(new SimpleBodySendable(request));
- }
-
- public void addRemoteSubcriber(MemberHandle peer)
- {
- _logger.info(new LogMessage("Added remote subscriber for {0} to clustered queue {1}", peer, this));
- //find (or create) a matching subscriber for the peer then increment the count
- getSubscriber(key(peer), true).increment();
- }
-
- public void removeRemoteSubscriber(MemberHandle peer)
- {
- //find a matching subscriber for the peer then decrement the count
- //if count is now zero, remove the subscriber
- SimpleMemberHandle key = key(peer);
- RemoteSubscriptionImpl s = getSubscriber(key, true);
- if (s == null)
- {
- throw new RuntimeException("No subscriber for " + peer);
- }
- if (s.decrement())
- {
- _peers.remove(key);
- _subscriptions.removeSubscription(s);
- }
- }
-
- public void removeAllRemoteSubscriber(MemberHandle peer)
- {
- SimpleMemberHandle key = key(peer);
- RemoteSubscriptionImpl s = getSubscriber(key, true);
- _peers.remove(key);
- _subscriptions.removeSubscription(s);
- }
-
- private RemoteSubscriptionImpl getSubscriber(SimpleMemberHandle key, boolean create)
- {
- RemoteSubscriptionImpl s = _peers.get(key);
- if (s == null && create)
- {
- return addSubscriber(key, new RemoteSubscriptionImpl(_groupMgr, key));
- }
- else
- {
- return s;
- }
- }
-
- private RemoteSubscriptionImpl addSubscriber(SimpleMemberHandle key, RemoteSubscriptionImpl s)
- {
- RemoteSubscriptionImpl other = _peers.putIfAbsent(key, s);
- if (other == null)
- {
- _subscriptions.addSubscription(s);
- new SubscriberCleanup(key, this, _groupMgr);
- return s;
- }
- else
- {
- return other;
- }
- }
-
- private SimpleMemberHandle key(MemberHandle peer)
- {
- return peer instanceof SimpleMemberHandle ? (SimpleMemberHandle) peer : (SimpleMemberHandle) SimpleMemberHandle.resolve(peer);
- }
-
- static boolean isFromBroker(AMQMessage msg)
- {
- return ClusteredProtocolSession.isPayloadFromPeer(msg);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
deleted file mode 100644
index 39ae7e3c3e..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.cluster.util.LogMessage;
-
-import java.util.List;
-
-class ClusteredSubscriptionManager extends SubscriptionSet
-{
- private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
- private final NestedSubscriptionManager _all;
-
- ClusteredSubscriptionManager()
- {
- this(new NestedSubscriptionManager());
- }
-
- private ClusteredSubscriptionManager(NestedSubscriptionManager all)
- {
- _all = all;
- _all.addSubscription(new Parent());
- }
-
- NestedSubscriptionManager getAllSubscribers()
- {
- return _all;
- }
-
- public boolean hasActiveSubscribers()
- {
- return _all.hasActiveSubscribers();
- }
-
- public Subscription nextSubscriber(AMQMessage msg)
- {
- if(ClusteredQueue.isFromBroker(msg))
- {
- //if message is from another broker, it should only be delivered
- //to another client to meet ordering constraints
- Subscription s = super.nextSubscriber(msg);
- _logger.info(new LogMessage("Returning next *client* subscriber {0}", s));
- if(s == null)
- {
- //TODO: deliver to another broker, but set the redelivered flag on the msg
- //(this should be policy based)
-
- //for now just don't deliver it
- return null;
- }
- else
- {
- return s;
- }
- }
- Subscription s = _all.nextSubscriber(msg);
- _logger.info(new LogMessage("Returning next subscriber {0}", s));
- return s;
- }
-
- private class Parent implements WeightedSubscriptionManager
- {
- public int getWeight()
- {
- return ClusteredSubscriptionManager.this.getWeight();
- }
-
- public List<Subscription> getSubscriptions()
- {
- return ClusteredSubscriptionManager.super.getSubscriptions();
- }
-
- public boolean hasActiveSubscribers()
- {
- return ClusteredSubscriptionManager.super.hasActiveSubscribers();
- }
-
- public Subscription nextSubscriber(AMQMessage msg)
- {
- return ClusteredSubscriptionManager.super.nextSubscriber(msg);
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
deleted file mode 100644
index 0566c5203b..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.List;
-import java.util.LinkedList;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * Distributes messages among a list of subsscription managers, using their
- * weighting.
- */
-class NestedSubscriptionManager implements SubscriptionManager
-{
- private final List<WeightedSubscriptionManager> _subscribers = new CopyOnWriteArrayList<WeightedSubscriptionManager>();
- private int _iterations;
- private int _index;
-
- void addSubscription(WeightedSubscriptionManager s)
- {
- _subscribers.add(s);
- }
-
- void removeSubscription(WeightedSubscriptionManager s)
- {
- _subscribers.remove(s);
- }
-
-
- public List<Subscription> getSubscriptions()
- {
- List<Subscription> allSubs = new LinkedList<Subscription>();
-
- for (WeightedSubscriptionManager subMans : _subscribers)
- {
- allSubs.addAll(subMans.getSubscriptions());
- }
-
- return allSubs;
- }
-
- public boolean hasActiveSubscribers()
- {
- for (WeightedSubscriptionManager s : _subscribers)
- {
- if (s.hasActiveSubscribers())
- {
- return true;
- }
- }
- return false;
- }
-
- public Subscription nextSubscriber(AMQMessage msg)
- {
- WeightedSubscriptionManager start = current();
- for (WeightedSubscriptionManager s = start; s != null; s = next(start))
- {
- if (hasMore(s))
- {
- return nextSubscriber(s);
- }
- }
- return null;
- }
-
- private Subscription nextSubscriber(WeightedSubscriptionManager s)
- {
- _iterations++;
- return s.nextSubscriber(null);
- }
-
- private WeightedSubscriptionManager current()
- {
- return _subscribers.isEmpty() ? null : _subscribers.get(_index);
- }
-
- private boolean hasMore(WeightedSubscriptionManager s)
- {
- return _iterations < s.getWeight();
- }
-
- private WeightedSubscriptionManager next(WeightedSubscriptionManager start)
- {
- WeightedSubscriptionManager s = next();
- return s == start && !hasMore(s) ? null : s;
- }
-
- private WeightedSubscriptionManager next()
- {
- _iterations = 0;
- if (++_index >= _subscribers.size())
- {
- _index = 0;
- }
- return _subscribers.get(_index);
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
deleted file mode 100644
index f8e4311a77..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.cluster.SimpleSendable;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.SimpleBodySendable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.AMQShortString;
-
-import java.util.concurrent.Executor;
-
-/**
- * Used to represent a private queue held locally.
- *
- */
-public class PrivateQueue extends AMQQueue
-{
- private final GroupManager _groupMgr;
-
- public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, virtualHost);
- _groupMgr = groupMgr;
-
- }
-
- protected void autodelete() throws AMQException
- {
- //delete locally:
- super.autodelete();
-
- //send delete request to peers:
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0,
- QueueDeleteBody.getClazz((byte)8, (byte)0),
- QueueDeleteBody.getMethod((byte)8, (byte)0),
- false,false,false,null,0);
- request.queue = getName();
- _groupMgr.broadcast(new SimpleBodySendable(request));
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
deleted file mode 100644
index efc0540c18..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.cluster.MembershipChangeListener;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-
-class ProxiedQueueCleanup implements MembershipChangeListener
-{
- private static final Logger _logger = Logger.getLogger(ProxiedQueueCleanup.class);
-
- private final MemberHandle _subject;
- private final RemoteQueueProxy _queue;
-
- ProxiedQueueCleanup(MemberHandle subject, RemoteQueueProxy queue)
- {
- _subject = subject;
- _queue = queue;
- }
-
- public void changed(List<MemberHandle> members)
- {
- if(!members.contains(_subject))
- {
- try
- {
- _queue.delete();
- _logger.info(new LogMessage("Deleted {0} in response to exclusion of {1}", _queue, _subject));
- }
- catch (AMQException e)
- {
- _logger.info(new LogMessage("Failed to delete {0} in response to exclusion of {1}: {2}", _queue, _subject, e), e);
- }
- }
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
deleted file mode 100644
index 2a83d65ae5..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.SimpleSendable;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-/**
- * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does
- * not require all the functionality currently in AMQQueue.
- *
- */
-public class RemoteQueueProxy extends AMQQueue
-{
- private static final Logger _logger = Logger.getLogger(RemoteQueueProxy.class);
- private final MemberHandle _target;
- private final GroupManager _groupMgr;
-
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, virtualHost);
- _target = target;
- _groupMgr = groupMgr;
- _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
- }
-
-
- public void deliver(AMQMessage msg) throws NoConsumersException
- {
- if (ClusteredProtocolSession.canRelay(msg, _target))
- {
- try
- {
- _logger.debug(new LogMessage("Relaying {0} to {1}", msg, _target));
- relay(msg);
- }
- catch (NoConsumersException e)
- {
- throw e;
- }
- catch (AMQException e)
- {
- //TODO: sort out exception handling...
- e.printStackTrace();
- }
- }
- else
- {
- _logger.debug(new LogMessage("Cannot relay {0} to {1}", msg, _target));
- }
- }
-
- void relay(AMQMessage msg) throws AMQException
- {
- // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object
- // if cluster can handle immediate then it should wrap the wrapper...
-
-// BasicPublishBody publish = msg.getMessagePublishInfo();
-// publish.immediate = false; //can't as yet handle the immediate flag in a cluster
-
- // send this on to the broker for which it is acting as proxy:
- _groupMgr.send(_target, new SimpleSendable(msg));
- }
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
deleted file mode 100644
index e396432cea..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.SimpleSendable;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.AMQException;
-
-import java.util.Queue;
-import java.util.List;
-
-class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
-{
- private final GroupManager _groupMgr;
- private final MemberHandle _peer;
- private boolean _suspended;
- private int _count;
-
- RemoteSubscriptionImpl(GroupManager groupMgr, MemberHandle peer)
- {
- _groupMgr = groupMgr;
- _peer = peer;
- }
-
- synchronized void increment()
- {
- _count++;
- }
-
- synchronized boolean decrement()
- {
- return --_count <= 0;
- }
-
- public void send(AMQMessage msg, AMQQueue queue)
- {
- try
- {
- _groupMgr.send(_peer, new SimpleSendable(msg));
- }
- catch (AMQException e)
- {
- //TODO: handle exceptions properly...
- e.printStackTrace();
- }
- }
-
- public synchronized void setSuspended(boolean suspended)
- {
- _suspended = suspended;
- }
-
- public synchronized boolean isSuspended()
- {
- return _suspended;
- }
-
- public synchronized int getWeight()
- {
- return _count;
- }
-
- public List<Subscription> getSubscriptions()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean hasActiveSubscribers()
- {
- return getWeight() == 0;
- }
-
- public Subscription nextSubscriber(AMQMessage msg)
- {
- return this;
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- if (queue instanceof ClusteredQueue)
- {
- ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
- }
- }
-
- public boolean filtersMessages()
- {
- return false;
- }
-
- public boolean hasInterest(AMQMessage msg)
- {
- return true;
- }
-
- public Queue<AMQMessage> getPreDeliveryQueue()
- {
- return null;
- }
-
- public Queue<AMQMessage> getResendQueue()
- {
- return null;
- }
-
- public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
- {
- return messages;
- }
-
- public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
- {
- //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
- }
-
- public boolean isAutoClose()
- {
- return false;
- }
-
- public void close()
- {
- //no-op
- }
-
- public boolean isClosed()
- {
- return false;
- }
-
- public boolean isBrowser()
- {
- return false;
- }
-
- public boolean wouldSuspend(AMQMessage msg)
- {
- return _suspended;
- }
-
- public void addToResendQueue(AMQMessage msg)
- {
- //no-op
- }
-
- public Object getSendLock()
- {
- return new Object();
- }
-
- public AMQChannel getChannel()
- {
- return null;
- }
-
-}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java
deleted file mode 100644
index cc951a4709..0000000000
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.cluster.MembershipChangeListener;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-
-class SubscriberCleanup implements MembershipChangeListener
-{
- private static final Logger _logger = Logger.getLogger(SubscriberCleanup.class);
-
- private final MemberHandle _subject;
- private final ClusteredQueue _queue;
- private final GroupManager _manager;
-
- SubscriberCleanup(MemberHandle subject, ClusteredQueue queue, GroupManager manager)
- {
- _subject = subject;
- _queue = queue;
- _manager = manager;
- _manager.addMemberhipChangeListener(this);
- }
-
- public void changed(List<MemberHandle> members)
- {
- if(!members.contains(_subject))
- {
- _queue.removeAllRemoteSubscriber(_subject);
- _manager.removeMemberhipChangeListener(this);
- _logger.info(new LogMessage("Removed {0} from {1}", _subject, _queue));
- }
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java
deleted file mode 100644
index b91d7140e0..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import junit.framework.TestCase;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-public class BrokerGroupTest extends TestCase
-{
- private final MemberHandle a = new SimpleMemberHandle("A", 1);
- private final MemberHandle b = new SimpleMemberHandle("B", 1);
- private final MemberHandle c = new SimpleMemberHandle("C", 1);
- private final MemberHandle d = new SimpleMemberHandle("D", 1);
-
- //join (new members perspective)
- // (i) connectToLeader()
- // ==> check state
- // (ii) setMembers()
- // ==> check state
- // ==> check members
- // (iii) synched(leader)
- // ==> check state
- // ==> check peers
- // (iv) synched(other)
- // ==> check state
- // ==> check peers
- // repeat for all others
- public void testJoin_newMember() throws Exception
- {
- MemberHandle[] pre = new MemberHandle[]{a, b, c};
- MemberHandle[] post = new MemberHandle[]{a, b, c};
-
- BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory());
- assertEquals(JoinState.UNINITIALISED, group.getState());
- //(i)
- group.connectToLeader(a);
- assertEquals(JoinState.JOINING, group.getState());
- assertEquals("Wrong number of peers", 1, group.getPeers().size());
- //(ii)
- group.setMembers(Arrays.asList(post));
- assertEquals(JoinState.INITIATION, group.getState());
- assertEquals(Arrays.asList(post), group.getMembers());
- //(iii) & (iv)
- for (MemberHandle member : pre)
- {
- group.synched(member);
- if (member == c)
- {
- assertEquals(JoinState.JOINED, group.getState());
- assertEquals("Wrong number of peers", pre.length, group.getPeers().size());
- }
- else
- {
- assertEquals(JoinState.INDUCTION, group.getState());
- assertEquals("Wrong number of peers", 1, group.getPeers().size());
- }
- }
- }
-
- //join (leaders perspective)
- // (i) extablish()
- // ==> check state
- // ==> check members
- // ==> check peers
- // (ii) connectToProspect()
- // ==> check members
- // ==> check peers
- // repeat (ii)
- public void testJoin_Leader() throws IOException, InterruptedException
- {
- MemberHandle[] prospects = new MemberHandle[]{b, c, d};
-
- BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory());
- assertEquals(JoinState.UNINITIALISED, group.getState());
- //(i)
- group.establish();
- assertEquals(JoinState.JOINED, group.getState());
- assertEquals("Wrong number of peers", 0, group.getPeers().size());
- assertEquals("Wrong number of members", 1, group.getMembers().size());
- assertEquals(a, group.getMembers().get(0));
- //(ii)
- for (int i = 0; i < prospects.length; i++)
- {
- group.connectToProspect(prospects[i]);
- assertEquals("Wrong number of peers", i + 1, group.getPeers().size());
- for (int j = 0; j <= i; j++)
- {
- assertTrue(prospects[i].matches(group.getPeers().get(i)));
- }
- assertEquals("Wrong number of members", i + 2, group.getMembers().size());
- assertEquals(a, group.getMembers().get(0));
- for (int j = 0; j <= i; j++)
- {
- assertEquals(prospects[i], group.getMembers().get(i + 1));
- }
- }
- }
-
- //join (general perspective)
- // (i) set up group
- // (ii) setMembers()
- // ==> check members
- // ==> check peers
- public void testJoin_general() throws Exception
- {
- MemberHandle[] view1 = new MemberHandle[]{a, b, c};
- MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
- MemberHandle[] peers = new MemberHandle[]{a, b, d};
-
- BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
- //(i)
- group.connectToLeader(a);
- group.setMembers(Arrays.asList(view1));
- for (MemberHandle h : view1)
- {
- group.synched(h);
- }
- //(ii)
- group.setMembers(Arrays.asList(view2));
- assertEquals(Arrays.asList(view2), group.getMembers());
- assertEquals(peers.length, group.getPeers().size());
- for (int i = 0; i < peers.length; i++)
- {
- assertTrue(peers[i].matches(group.getPeers().get(i)));
- }
- }
-
- //leadership transfer (valid)
- // (i) set up group
- // (ii) assumeLeadership()
- // ==> check return value
- // ==> check members
- // ==> check peers
- // ==> check isLeader()
- // ==> check isLeader(old_leader)
- // ==> check isMember(old_leader)
- public void testTransferLeadership_valid() throws Exception
- {
- MemberHandle[] view1 = new MemberHandle[]{a, b};
- MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
- MemberHandle[] view3 = new MemberHandle[]{b, c, d};
-
- BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory());
- //(i)
- group.connectToLeader(a);
- group.setMembers(Arrays.asList(view1));
- for (MemberHandle h : view1)
- {
- group.synched(h);
- }
- group.setMembers(Arrays.asList(view2));
- //(ii)
- boolean result = group.assumeLeadership();
- assertTrue(result);
- assertTrue(group.isLeader());
- assertFalse(group.isLeader(a));
- assertEquals(Arrays.asList(view3), group.getMembers());
- assertEquals(2, group.getPeers().size());
- assertTrue(c.matches(group.getPeers().get(0)));
- assertTrue(d.matches(group.getPeers().get(1)));
- }
-
- //leadership transfer (invalid)
- // (i) set up group
- // (ii) assumeLeadership()
- // ==> check return value
- // ==> check members
- // ==> check peers
- // ==> check isLeader()
- // ==> check isLeader(old_leader)
- // ==> check isMember(old_leader)
- public void testTransferLeadership_invalid() throws Exception
- {
- MemberHandle[] view1 = new MemberHandle[]{a, b, c};
- MemberHandle[] view2 = new MemberHandle[]{a, b, c, d};
-
- BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
- //(i)
- group.connectToLeader(a);
- group.setMembers(Arrays.asList(view1));
- for (MemberHandle h : view1)
- {
- group.synched(h);
- }
- group.setMembers(Arrays.asList(view2));
- //(ii)
- boolean result = group.assumeLeadership();
- assertFalse(result);
- assertFalse(group.isLeader());
- assertTrue(group.isLeader(a));
- assertEquals(Arrays.asList(view2), group.getMembers());
- assertEquals(3, group.getPeers().size());
- assertTrue(a.matches(group.getPeers().get(0)));
- assertTrue(b.matches(group.getPeers().get(1)));
- assertTrue(d.matches(group.getPeers().get(2)));
-
- }
-
- //leave (leaders perspective)
- // (i) set up group
- // (ii) remove a member
- // ==> check members
- // ==> check peers
- // ==> check isMember(removed_member)
- // repeat (ii)
- public void testLeave_leader()
- {
- MemberHandle[] view1 = new MemberHandle[]{a, b, c, d};
- MemberHandle[] view2 = new MemberHandle[]{a, b, d};
- MemberHandle[] view3 = new MemberHandle[]{a, d};
- MemberHandle[] view4 = new MemberHandle[]{a};
- //(i)
- BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory());
- group.establish();
- group.setMembers(Arrays.asList(view1));
- //(ii)
- group.remove(group.findBroker(c, false));
- assertEquals(Arrays.asList(view2), group.getMembers());
-
- group.remove(group.findBroker(b, false));
- assertEquals(Arrays.asList(view3), group.getMembers());
-
- group.remove(group.findBroker(d, false));
- assertEquals(Arrays.asList(view4), group.getMembers());
- }
-
-
- //leave (general perspective)
- // (i) set up group
- // (ii) setMember
- // ==> check members
- // ==> check peers
- // ==> check isMember(removed_member)
- // repeat (ii)
- public void testLeave_general()
- {
- MemberHandle[] view1 = new MemberHandle[]{a, b, c, d};
- MemberHandle[] view2 = new MemberHandle[]{a, c, d};
- //(i)
- BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory());
- group.establish(); //not strictly the correct way to build up the group, but ok for here
- group.setMembers(Arrays.asList(view1));
- //(ii)
- group.setMembers(Arrays.asList(view2));
- assertEquals(Arrays.asList(view2), group.getMembers());
- assertEquals(2, group.getPeers().size());
- assertTrue(a.matches(group.getPeers().get(0)));
- assertTrue(d.matches(group.getPeers().get(1)));
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
deleted file mode 100644
index f1da312eea..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import junit.framework.TestCase;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.policy.StandardPolicies;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class BrokerTest extends TestCase
-{
- //group request (no failure)
- public void testGroupRequest_noFailure() throws AMQException
- {
- RecordingBroker[] brokers = new RecordingBroker[]{
- new RecordingBroker("A", 1),
- new RecordingBroker("B", 2),
- new RecordingBroker("C", 3)
- };
- GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers)));
- GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
- for (Broker b : brokers)
- {
- b.invoke(grpRequest);
- }
- grpRequest.finishedSend();
-
- for (RecordingBroker b : brokers)
- {
- b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(), new TestMethod("response"));
- }
-
- assertTrue("Handler did not receive response", handler.isCompleted());
- }
-
- //group request (failure)
- public void testGroupRequest_failure() throws AMQException
- {
- RecordingBroker a = new RecordingBroker("A", 1);
- RecordingBroker b = new RecordingBroker("B", 2);
- RecordingBroker c = new RecordingBroker("C", 3);
- RecordingBroker[] all = new RecordingBroker[]{a, b, c};
- RecordingBroker[] succeeded = new RecordingBroker[]{a, c};
-
- GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded)));
- GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
-
- for (Broker broker : all)
- {
- broker.invoke(grpRequest);
- }
- grpRequest.finishedSend();
-
- for (RecordingBroker broker : succeeded)
- {
- broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).getChannel(), new TestMethod("response"));
- }
- b.remove();
-
- assertTrue("Handler did not receive response", handler.isCompleted());
- }
-
-
- //simple send (no response)
- public void testSend_noResponse() throws AMQException
- {
- AMQBody[] msgs = new AMQBody[]{
- new TestMethod("A"),
- new TestMethod("B"),
- new TestMethod("C")
- };
- RecordingBroker broker = new RecordingBroker("myhost", 1);
- for (AMQBody msg : msgs)
- {
- broker.send(new SimpleBodySendable(msg), null);
- }
- List<AMQDataBlock> sent = broker.getMessages();
- assertEquals(msgs.length, sent.size());
- for (int i = 0; i < msgs.length; i++)
- {
- assertTrue(sent.get(i) instanceof AMQFrame);
- assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame());
- }
- }
-
- //simple send (no failure)
- public void testSend_noFailure() throws AMQException
- {
- RecordingBroker broker = new RecordingBroker("myhost", 1);
- BlockingHandler handler = new BlockingHandler();
- broker.send(new SimpleBodySendable(new TestMethod("A")), handler);
- List<AMQDataBlock> sent = broker.getMessages();
- assertEquals(1, sent.size());
- assertTrue(sent.get(0) instanceof AMQFrame);
- assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame());
-
- broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new TestMethod("B"));
-
- assertEquals(new TestMethod("B"), handler.getResponse());
- }
-
- //simple send (failure)
- public void testSend_failure() throws AMQException
- {
- RecordingBroker broker = new RecordingBroker("myhost", 1);
- BlockingHandler handler = new BlockingHandler();
- broker.send(new SimpleBodySendable(new TestMethod("A")), handler);
- List<AMQDataBlock> sent = broker.getMessages();
- assertEquals(1, sent.size());
- assertTrue(sent.get(0) instanceof AMQFrame);
- assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame());
- broker.remove();
- assertEquals(null, handler.getResponse());
- assertTrue(handler.isCompleted());
- assertTrue(handler.failed());
- }
-
- private static class TestMethod extends AMQMethodBody
- {
- private final Object id;
-
- TestMethod(Object id)
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- super((byte)8, (byte)0);
- this.id = id;
- }
-
- protected int getBodySize()
- {
- return 0;
- }
-
- protected int getClazz()
- {
- return 1002;
- }
-
- protected int getMethod()
- {
- return 1003;
- }
-
- protected void writeMethodPayload(ByteBuffer buffer)
- {
- }
-
- protected byte getType()
- {
- return 0;
- }
-
- protected int getSize()
- {
- return 0;
- }
-
- protected void writePayload(ByteBuffer buffer)
- {
- }
-
- protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
- {
- }
-
- protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
- {
- }
-
- public boolean equals(Object o)
- {
- return o instanceof TestMethod && id.equals(((TestMethod) o).id);
- }
-
- public int hashCode()
- {
- return id.hashCode();
- }
-
- }
-
- private static class GroupResponseValidator implements GroupResponseHandler
- {
- private final AMQMethodBody _response;
- private final List<Member> _members;
- private boolean _completed = false;
-
- GroupResponseValidator(AMQMethodBody response, List<Member> members)
- {
- _response = response;
- _members = members;
- }
-
- public void response(List<AMQMethodBody> responses, List<Member> members)
- {
- for (AMQMethodBody r : responses)
- {
- assertEquals(_response, r);
- }
- assertEquals(_members, members);
- _completed = true;
- }
-
- boolean isCompleted()
- {
- return _completed;
- }
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
deleted file mode 100644
index 830a00f4c2..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import junit.framework.TestCase;
-import org.apache.qpid.framing.AMQShortString;
-
-public class ClusterCapabilityTest extends TestCase
-{
- public void testStartWithNull()
- {
- MemberHandle peer = new SimpleMemberHandle("myhost:9999");
- AMQShortString c = ClusterCapability.add(null, peer);
- assertTrue(ClusterCapability.contains(c));
- assertTrue(peer.matches(ClusterCapability.getPeer(c)));
- }
-
- public void testStartWithText()
- {
- MemberHandle peer = new SimpleMemberHandle("myhost:9999");
- AMQShortString c = ClusterCapability.add(new AMQShortString("existing text"), peer);
- assertTrue(ClusterCapability.contains(c));
- assertTrue(peer.matches(ClusterCapability.getPeer(c)));
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java
deleted file mode 100644
index 7e58add91e..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.mina.common.IoSession;
-
-import java.util.List;
-import java.util.ArrayList;
-
-import junit.framework.TestCase;
-
-public class InductionBufferTest extends TestCase
-{
- public void test() throws Exception
- {
- IoSession session1 = new TestSession();
- IoSession session2 = new TestSession();
- IoSession session3 = new TestSession();
-
- TestMessageHandler handler = new TestMessageHandler();
- InductionBuffer buffer = new InductionBuffer(handler);
-
- buffer.receive(session1, "one");
- buffer.receive(session2, "two");
- buffer.receive(session3, "three");
-
- buffer.receive(session1, "four");
- buffer.receive(session1, "five");
- buffer.receive(session1, "six");
-
- buffer.receive(session3, "seven");
- buffer.receive(session3, "eight");
-
- handler.checkEmpty();
- buffer.deliver();
-
- handler.check(session1, "one");
- handler.check(session2, "two");
- handler.check(session3, "three");
-
- handler.check(session1, "four");
- handler.check(session1, "five");
- handler.check(session1, "six");
-
- handler.check(session3, "seven");
- handler.check(session3, "eight");
- handler.checkEmpty();
-
- buffer.receive(session1, "nine");
- buffer.receive(session2, "ten");
- buffer.receive(session3, "eleven");
-
- handler.check(session1, "nine");
- handler.check(session2, "ten");
- handler.check(session3, "eleven");
-
- handler.checkEmpty();
- }
-
- private static class TestMessageHandler implements InductionBuffer.MessageHandler
- {
- private final List<IoSession> _sessions = new ArrayList<IoSession>();
- private final List<Object> _msgs = new ArrayList<Object>();
-
- public synchronized void deliver(IoSession session, Object msg) throws Exception
- {
- _sessions.add(session);
- _msgs.add(msg);
- }
-
- void check(IoSession actualSession, Object actualMsg)
- {
- assertFalse(_sessions.isEmpty());
- assertFalse(_msgs.isEmpty());
- IoSession expectedSession = _sessions.remove(0);
- Object expectedMsg = _msgs.remove(0);
- assertEquals(expectedSession, actualSession);
- assertEquals(expectedMsg, actualMsg);
- }
-
- void checkEmpty()
- {
- assertTrue(_sessions.isEmpty());
- assertTrue(_msgs.isEmpty());
- }
- }
-}
-
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java
deleted file mode 100644
index 1ec5154a98..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class RecordingBroker extends TestBroker
-{
- private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>();
-
- RecordingBroker(String host, int port)
- {
- super(host, port);
- }
-
- public void send(AMQDataBlock data) throws AMQException
- {
- _messages.add(data);
- }
-
- List<AMQDataBlock> getMessages()
- {
- return _messages;
- }
-
- void clear()
- {
- _messages.clear();
- }
-
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java
deleted file mode 100644
index d3e972e273..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-class RecordingBrokerFactory implements BrokerFactory
-{
- public Broker create(MemberHandle handle)
- {
- return new RecordingBroker(handle.getHost(), handle.getPort());
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
deleted file mode 100644
index 86cde3cee7..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-
-import javax.jms.JMSException;
-
-import junit.framework.TestCase;
-
-public class SimpleClusterTest extends TestCase
-{
- public void testDeclareExchange() throws AMQException, JMSException, URLSyntaxException
- {
- AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test");
- AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- System.out.println("Session created");
- session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"), true);
- System.out.println("Exchange declared");
- con.close();
- System.out.println("Connection closed");
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java
deleted file mode 100644
index 8ff8357377..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import junit.framework.TestCase;
-
-public class SimpleMemberHandleTest extends TestCase
-{
- public void testMatches()
- {
- assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888));
- assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888));
- assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888));
- }
-
- public void testResolve()
- {
- assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000"));
- }
-
- private void assertEquivalent(MemberHandle a, MemberHandle b)
- {
- String msg = a + " is not equivalent to " + b;
- a = SimpleMemberHandle.resolve(a);
- b = SimpleMemberHandle.resolve(b);
- msg += "(" + a + " does not match " + b + ")";
- assertTrue(msg, a.matches(b));
- }
-
- private void assertMatch(MemberHandle a, MemberHandle b)
- {
- assertTrue(a + " does not match " + b, a.matches(b));
- }
-
- private void assertNoMatch(MemberHandle a, MemberHandle b)
- {
- assertFalse(a + " matches " + b, a.matches(b));
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java
deleted file mode 100644
index d3ccbf0ac6..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.io.IOException;
-
-class TestBroker extends Broker
-{
- TestBroker(String host, int port)
- {
- super(host, port);
- }
-
- boolean connect() throws IOException, InterruptedException
- {
- return true;
- }
-
- void connectAsynch(Iterable<AMQMethodBody> msgs)
- {
- replay(msgs);
- }
-
- void replay(Iterable<AMQMethodBody> msgs)
- {
- try
- {
- for (AMQMethodBody b : msgs)
- {
- send(new AMQFrame(0, b));
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- Broker connectToCluster() throws IOException, InterruptedException
- {
- return this;
- }
-
- public void send(AMQDataBlock data) throws AMQException
- {
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java
deleted file mode 100644
index 92eaec876a..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-class TestBrokerFactory implements BrokerFactory
-{
- public Broker create(MemberHandle handle)
- {
- return new TestBroker(handle.getHost(), handle.getPort());
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java
deleted file mode 100644
index c529c83cc0..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.replay.ReplayManager;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class TestReplayManager implements ReplayManager
-{
- private final List<AMQMethodBody> _msgs;
-
- TestReplayManager()
- {
- this(new ArrayList<AMQMethodBody>());
- }
-
- TestReplayManager(List<AMQMethodBody> msgs)
- {
- _msgs = msgs;
- }
-
- public List<AMQMethodBody> replay(boolean isLeader)
- {
- return _msgs;
- }
-}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java
deleted file mode 100644
index 86ec808924..0000000000
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.mina.common.*;
-
-import java.net.SocketAddress;
-import java.util.Set;
-
-class TestSession implements IoSession
-{
- public IoService getService()
- {
- return null; //TODO
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return null; //TODO
- }
-
- public IoHandler getHandler()
- {
- return null; //TODO
- }
-
- public IoSessionConfig getConfig()
- {
- return null; //TODO
- }
-
- public IoFilterChain getFilterChain()
- {
- return null; //TODO
- }
-
- public WriteFuture write(Object message)
- {
- return null; //TODO
- }
-
- public CloseFuture close()
- {
- return null; //TODO
- }
-
- public Object getAttachment()
- {
- return null; //TODO
- }
-
- public Object setAttachment(Object attachment)
- {
- return null; //TODO
- }
-
- public Object getAttribute(String key)
- {
- return null; //TODO
- }
-
- public Object setAttribute(String key, Object value)
- {
- return null; //TODO
- }
-
- public Object setAttribute(String key)
- {
- return null; //TODO
- }
-
- public Object removeAttribute(String key)
- {
- return null; //TODO
- }
-
- public boolean containsAttribute(String key)
- {
- return false; //TODO
- }
-
- public Set getAttributeKeys()
- {
- return null; //TODO
- }
-
- public TransportType getTransportType()
- {
- return null; //TODO
- }
-
- public boolean isConnected()
- {
- return false; //TODO
- }
-
- public boolean isClosing()
- {
- return false; //TODO
- }
-
- public CloseFuture getCloseFuture()
- {
- return null; //TODO
- }
-
- public SocketAddress getRemoteAddress()
- {
- return null; //TODO
- }
-
- public SocketAddress getLocalAddress()
- {
- return null; //TODO
- }
-
- public SocketAddress getServiceAddress()
- {
- return null; //TODO
- }
-
- public int getIdleTime(IdleStatus status)
- {
- return 0; //TODO
- }
-
- public long getIdleTimeInMillis(IdleStatus status)
- {
- return 0; //TODO
- }
-
- public void setIdleTime(IdleStatus status, int idleTime)
- {
- //TODO
- }
-
- public int getWriteTimeout()
- {
- return 0; //TODO
- }
-
- public long getWriteTimeoutInMillis()
- {
- return 0; //TODO
- }
-
- public void setWriteTimeout(int writeTimeout)
- {
- //TODO
- }
-
- public TrafficMask getTrafficMask()
- {
- return null; //TODO
- }
-
- public void setTrafficMask(TrafficMask trafficMask)
- {
- //TODO
- }
-
- public void suspendRead()
- {
- //TODO
- }
-
- public void suspendWrite()
- {
- //TODO
- }
-
- public void resumeRead()
- {
- //TODO
- }
-
- public void resumeWrite()
- {
- //TODO
- }
-
- public long getReadBytes()
- {
- return 0; //TODO
- }
-
- public long getWrittenBytes()
- {
- return 0; //TODO
- }
-
- public long getReadMessages()
- {
- return 0;
- }
-
- public long getWrittenMessages()
- {
- return 0;
- }
-
- public long getWrittenWriteRequests()
- {
- return 0; //TODO
- }
-
- public int getScheduledWriteRequests()
- {
- return 0; //TODO
- }
-
- public int getScheduledWriteBytes()
- {
- return 0; //TODO
- }
-
- public long getCreationTime()
- {
- return 0; //TODO
- }
-
- public long getLastIoTime()
- {
- return 0; //TODO
- }
-
- public long getLastReadTime()
- {
- return 0; //TODO
- }
-
- public long getLastWriteTime()
- {
- return 0; //TODO
- }
-
- public boolean isIdle(IdleStatus status)
- {
- return false; //TODO
- }
-
- public int getIdleCount(IdleStatus status)
- {
- return 0; //TODO
- }
-
- public long getLastIdleTime(IdleStatus status)
- {
- return 0; //TODO
- }
-}