summaryrefslogtreecommitdiff
path: root/java/cluster/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-03-22 13:14:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-03-22 13:14:42 +0000
commit3fb9be28593263e12623ce09084a230b59b81f4f (patch)
tree8de74dd781802819df0ff1ca56aaa94fa1b9b38e /java/cluster/src/main
parentb9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff)
downloadqpid-python-3fb9be28593263e12623ce09084a230b59b81f4f.tar.gz
made a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src/main')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java32
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java33
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java28
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java3
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java40
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java14
33 files changed, 206 insertions, 147 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
new file mode 100644
index 0000000000..4d2737edce
--- /dev/null
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public class AMQConnectionWaitException extends AMQException
+{
+ public AMQConnectionWaitException(String s, Throwable e)
+ {
+ super(s, e);
+
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
new file mode 100644
index 0000000000..3681fac750
--- /dev/null
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBodyImpl;
+
+public class AMQUnexpectedBodyTypeException extends AMQException
+{
+
+ public AMQUnexpectedBodyTypeException(Class<? extends AMQBodyImpl> expectedClass, AMQBodyImpl body)
+ {
+ super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
new file mode 100644
index 0000000000..721da24d53
--- /dev/null
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public class AMQUnexpectedFrameTypeException extends AMQException
+{
+ public AMQUnexpectedFrameTypeException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java
index 39508df566..aeded15eb8 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java
@@ -20,26 +20,26 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
public class BlockingHandler implements ResponseHandler
{
private final Class _expected;
private boolean _completed;
- private AMQMethodBody _response;
+ private AMQMethodBodyImpl _response;
public BlockingHandler()
{
- this(AMQMethodBody.class);
+ this(AMQMethodBodyImpl.class);
}
- public BlockingHandler(Class<? extends AMQMethodBody> expected)
+ public BlockingHandler(Class<? extends AMQMethodBodyImpl> expected)
{
_expected = expected;
}
- public void responded(AMQMethodBody response)
+ public void responded(AMQMethodBodyImpl response)
{
if (_expected.isInstance(response))
{
@@ -74,7 +74,7 @@ public class BlockingHandler implements ResponseHandler
}
}
- AMQMethodBody getResponse()
+ AMQMethodBodyImpl getResponse()
{
return _response;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java
index 7e2cf6da83..c560a1e20c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.log4j.Logger;
import java.io.IOException;
@@ -88,7 +88,7 @@ abstract class Broker extends SimpleMemberHandle implements Member
* @param response the response received
* @return true if the response matched an outstanding request
*/
- protected synchronized boolean handleResponse(int channel, AMQMethodBody response)
+ protected synchronized boolean handleResponse(int channel, AMQMethodBodyImpl response)
{
ResponseHandler request = _requests.get(channel);
if (request == null)
@@ -174,7 +174,7 @@ abstract class Broker extends SimpleMemberHandle implements Member
/**
* Start connection process, including replay
*/
- abstract void connectAsynch(Iterable<AMQMethodBody> msgs);
+ abstract void connectAsynch(Iterable<AMQMethodBodyImpl> msgs);
/**
* Replay messages to the remote peer this instance represents. These messages
@@ -182,7 +182,7 @@ abstract class Broker extends SimpleMemberHandle implements Member
*
* @param msgs
*/
- abstract void replay(Iterable<AMQMethodBody> msgs);
+ abstract void replay(Iterable<AMQMethodBodyImpl> msgs);
/**
* establish connection, handling redirect if required...
@@ -200,7 +200,7 @@ abstract class Broker extends SimpleMemberHandle implements Member
this.channel = channel;
}
- public void responded(AMQMethodBody response)
+ public void responded(AMQMethodBodyImpl response)
{
request.responseReceived(Broker.this, response);
_requests.remove(channel);
@@ -228,7 +228,7 @@ abstract class Broker extends SimpleMemberHandle implements Member
this.channel = channel;
}
- public void responded(AMQMethodBody response)
+ public void responded(AMQMethodBodyImpl response)
{
handler.responded(response);
_requests.remove(channel);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java
index 755a341607..03de4fbbb7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.replay.ReplayManager;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.cluster.util.InvokeMultiple;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.io.IOException;
import java.util.ArrayList;
@@ -225,7 +225,7 @@ class BrokerGroup
if (create)
{
Broker b = _factory.create(handle);
- List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local));
+ List<AMQMethodBodyImpl> msgs = _replayMgr.replay(isLeader(_local));
_logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b));
b.connectAsynch(msgs);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
index 1b4a3e8327..e3377d3ed1 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
@@ -26,7 +26,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
/**
* Hack to assist with reuse of the client handlers for connection setup in
@@ -49,7 +49,7 @@ class ClientAdapter implements MethodHandler
_stateMgr = stateMgr;
}
- public void handle(int channel, AMQMethodBody method) throws AMQException
+ public void handle(int channel, AMQMethodBodyImpl method) throws AMQException
{
AMQMethodEvent evt = new AMQMethodEvent(channel, method);
_stateMgr.methodReceived(evt);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
index 5300912716..fb9e1fa70c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionOpenOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionStartMethodHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.IllegalStateTransitionException;
@@ -78,14 +78,14 @@ public class ClientHandlerRegistry extends AMQStateManager
return registry;
}
- protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) throws IllegalStateTransitionException
+ protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBodyImpl frame) throws IllegalStateTransitionException
{
ClientRegistry registry = _handlers.get(state);
return registry == null ? null : registry.getHandler(frame);
}
- <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states)
+ <A extends Class<AMQMethodBodyImpl>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states)
{
for (AMQState state : states)
{
@@ -93,7 +93,7 @@ public class ClientHandlerRegistry extends AMQStateManager
}
}
- <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state)
+ <A extends Class<AMQMethodBodyImpl>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state)
{
ClientRegistry registry = _handlers.get(state);
if (registry == null)
@@ -106,15 +106,15 @@ public class ClientHandlerRegistry extends AMQStateManager
static class ClientRegistry
{
- private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry
- = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>();
+ private final Map<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener> registry
+ = new HashMap<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener>();
- <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler)
+ <A extends Class<AMQMethodBodyImpl>> void add(A type, StateAwareMethodListener handler)
{
registry.put(type, handler);
}
- StateAwareMethodListener getHandler(AMQMethodBody frame)
+ StateAwareMethodListener getHandler(AMQMethodBodyImpl frame)
{
return registry.get(frame.getClass());
}
@@ -122,9 +122,9 @@ public class ClientHandlerRegistry extends AMQStateManager
class ConnectionTuneHandler extends ConnectionTuneMethodHandler
{
- protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
+ protected AMQFrame createConnectionOpenBody(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
{
- return super.createConnectionOpenFrame(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist, major, minor);
+ return super.createConnectionOpenBody(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist, major, minor);
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index ee5aa48db9..4e0a367f40 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -24,17 +24,14 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.IoSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.cluster.util.LogMessage;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index 2f473b63fb..149378a626 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -230,7 +230,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
//connect to the host and port specified:
Broker prospect = connectToProspect(member);
announceMembership();
- List<AMQMethodBody> msgs = _replayMgr.replay(true);
+ List<AMQMethodBodyImpl> msgs = _replayMgr.replay(true);
_logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect));
prospect.replay(msgs);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java
index 8ab7856e87..55b51bf736 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,7 +35,7 @@ import java.util.Map;
*/
class GroupRequest
{
- private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>();
+ private final Map<Member, AMQMethodBodyImpl> _responses = new HashMap<Member, AMQMethodBodyImpl>();
private final List<Member> _brokers = new ArrayList<Member>();
private boolean _sent;
@@ -62,7 +62,7 @@ class GroupRequest
return checkCompletion();
}
- public boolean responseReceived(Member broker, AMQMethodBody response)
+ public boolean responseReceived(Member broker, AMQMethodBodyImpl response)
{
_responses.put(broker, response);
return checkCompletion();
@@ -90,9 +90,9 @@ class GroupRequest
return true;
}
- List<AMQMethodBody> getResults()
+ List<AMQMethodBodyImpl> getResults()
{
- List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size());
+ List<AMQMethodBodyImpl> results = new ArrayList<AMQMethodBodyImpl>(_brokers.size());
for (Member b : _brokers)
{
results.add(_responses.get(b));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java
index d2e9de2f39..d394651c26 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.util.List;
public interface GroupResponseHandler
{
//Note: this implies that the response to a group request will always be a method body...
- public void response(List<AMQMethodBody> responses, List<Member> members);
+ public void response(List<AMQMethodBodyImpl> responses, List<Member> members);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
index a83f034021..d3f2b9201c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
interface MethodHandler
{
- public void handle(int channel, AMQMethodBody method) throws AMQException;
+ public void handle(int channel, AMQMethodBodyImpl method) throws AMQException;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
index 748a660bb8..33ec771d6a 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.server.state.StateAwareMethodListener;
import java.util.HashMap;
@@ -28,16 +28,16 @@ import java.util.Map;
public class MethodHandlerRegistry
{
- private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry =
- new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
+ private final Map<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener<? extends AMQMethodBodyImpl>> registry =
+ new HashMap<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener<? extends AMQMethodBodyImpl>>();
- public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler)
+ public <A extends AMQMethodBodyImpl, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler)
{
registry.put(type, handler);
return this;
}
- public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame)
+ public <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> getHandler(B frame)
{
return (StateAwareMethodListener<B>) registry.get(frame.getClass());
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index 401a54444b..1f736ad94c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -33,13 +33,13 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQBodyImpl;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.ConnectionRedirectBody;
import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -57,7 +57,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
private final MemberHandle _local;
private IoSession _session;
private MethodHandler _handler;
- private Iterable<AMQMethodBody> _replay;
+ private Iterable<AMQMethodBodyImpl> _replay;
MinaBrokerProxy(String host, int port, MemberHandle local)
{
@@ -106,13 +106,13 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
return _connectionMonitor.waitUntilOpen();
}
- void connectAsynch(Iterable<AMQMethodBody> msgs)
+ void connectAsynch(Iterable<AMQMethodBodyImpl> msgs)
{
_replay = msgs;
connectImpl();
}
- void replay(Iterable<AMQMethodBody> msgs)
+ void replay(Iterable<AMQMethodBodyImpl> msgs)
{
_replay = msgs;
if(_connectionMonitor.isOpened())
@@ -138,7 +138,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
}
- public void send(AMQDataBlock data) throws AMQException
+ public void send(AMQDataBlock data) throws AMQConnectionWaitException
{
if (_session == null)
{
@@ -146,9 +146,9 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
{
_connectionMonitor.waitUntilOpen();
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQException("Failed to send " + data + ": " + e, e);
+ throw new AMQConnectionWaitException("Failed to send " + data + ": " + e, e);
}
}
_session.write(data);
@@ -158,14 +158,14 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
{
if(_replay != null)
{
- for(AMQMethodBody b : _replay)
+ for(AMQMethodBodyImpl b : _replay)
{
_session.write(new AMQFrame(0, b));
}
}
}
- public void handle(int channel, AMQMethodBody method) throws AMQException
+ public void handle(int channel, AMQMethodBodyImpl method) throws AMQException
{
_logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel));
if (!handleResponse(channel, method))
@@ -174,7 +174,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
}
- private void handleMethod(int channel, AMQMethodBody method) throws AMQException
+ private void handleMethod(int channel, AMQMethodBodyImpl method) throws AMQException
{
if (method instanceof ConnectionRedirectBody)
{
@@ -200,14 +200,14 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
private void handleFrame(AMQFrame frame) throws AMQException
{
- AMQBody body = frame.getBodyFrame();
- if (body instanceof AMQMethodBody)
+ AMQBodyImpl body = frame.getBodyFrame();
+ if (body instanceof AMQMethodBodyImpl)
{
- handleMethod(frame.getChannel(), (AMQMethodBody) body);
+ handleMethod(frame.getChannel(), (AMQMethodBodyImpl) body);
}
else
{
- throw new AMQException("Client only expects method body, got: " + body);
+ throw new AMQUnexpectedBodyTypeException(AMQMethodBodyImpl.class, body);
}
}
@@ -216,7 +216,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]";
}
- private class MinaBinding extends IoHandlerAdapter implements ProtocolVersionList
+ private class MinaBinding extends IoHandlerAdapter
{
public void sessionCreated(IoSession session) throws Exception
{
@@ -228,8 +228,8 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
/* Find last protocol version in protocol version list. Make sure last protocol version
listed in the build file (build-module.xml) is the latest version which will be used
here. */
- int i = pv.length - 1;
- session.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+
+ session.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
}
public void sessionOpened(IoSession session) throws Exception
@@ -260,7 +260,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
else
{
- throw new AMQException("Received message of unrecognised type: " + object);
+ throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java
index fe76ca6505..81341eb445 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
public interface ResponseHandler
{
- public void responded(AMQMethodBody response);
+ public void responded(AMQMethodBodyImpl response);
public void removed();
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
index 03b0dc7f2e..ac0373cc0d 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
@@ -21,14 +21,12 @@
package org.apache.qpid.server.cluster;
import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.IllegalStateTransitionException;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -74,7 +72,7 @@ class ServerHandlerRegistry extends AMQStateManager
}
}
- protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException
+ protected <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException
{
MethodHandlerRegistry registry = _handlers.get(state);
StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame);
@@ -85,7 +83,7 @@ class ServerHandlerRegistry extends AMQStateManager
return handler;
}
- <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler)
+ <A extends AMQMethodBodyImpl, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler)
{
MethodHandlerRegistry registry = _handlers.get(state);
if (registry == null)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java
index f7c40c60b3..bae930b341 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java
@@ -18,16 +18,16 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQBodyImpl;
import org.apache.qpid.framing.AMQFrame;
/**
*/
public class SimpleBodySendable implements Sendable
{
- private final AMQBody _body;
+ private final AMQBodyImpl _body;
- public SimpleBodySendable(AMQBody body)
+ public SimpleBodySendable(AMQBodyImpl body)
{
_body = body;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
index 86710e8a31..85955ab775 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
@@ -21,18 +21,14 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.util.List;
import java.util.ArrayList;
-public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A>
+public class ChainedClusterMethodHandler <A extends AMQMethodBodyImpl> extends ClusterMethodHandler<A>
{
private final List<ClusterMethodHandler<A>> _handlers;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
index faab99b0f6..5c7e1d7ff7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
@@ -20,17 +20,15 @@
*/
package org.apache.qpid.server.cluster.handler;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.AMQException;
-public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
+public abstract class ClusterMethodHandler<A extends AMQMethodBodyImpl> implements StateAwareMethodListener<A>
{
public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
index e7509da32a..313ba6d304 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
@@ -222,17 +222,17 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
}
}
- private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler)
+ private <B extends AMQMethodBodyImpl> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler)
{
return new ReplicatingHandler<B>(_groupMgr, handler);
}
- private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client)
+ private <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client)
{
return new PeerHandler<B>(peer, client);
}
- private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h)
+ private <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h)
{
return new ChainedClusterMethodHandler<B>(h);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
index a2f62f714b..a9cb096d33 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
@@ -21,15 +21,12 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
+class ExtendedHandler<A extends AMQMethodBodyImpl> implements StateAwareMethodListener<A>
{
private final StateAwareMethodListener<A> _base;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
index 8b0bb4b127..a0b95344ee 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
@@ -21,15 +21,12 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
+public class NullListener<T extends AMQMethodBodyImpl> implements StateAwareMethodListener<T>
{
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
index 447e51ccd9..50f25bd7fe 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
@@ -21,12 +21,8 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -36,7 +32,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
* application).
*
*/
-public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A>
+public class PeerHandler<A extends AMQMethodBodyImpl> extends ClusterMethodHandler<A>
{
private final StateAwareMethodListener<A> _peer;
private final StateAwareMethodListener<A> _client;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
index 888fa4e426..f7b9eb2b21 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.cluster.*;
import org.apache.qpid.server.cluster.policy.StandardPolicies;
@@ -41,7 +41,7 @@ import java.util.List;
* processed locally after 'completion' of this broadcast.
*
*/
-class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> implements StandardPolicies
+class ReplicatingHandler<A extends AMQMethodBodyImpl> extends ClusterMethodHandler<A> implements StandardPolicies
{
protected static final Logger _logger = Logger.getLogger(ReplicatingHandler.class);
@@ -109,7 +109,7 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
_evt = evt;
}
- public void response(List<AMQMethodBody> responses, List<Member> members)
+ public void response(List<AMQMethodBodyImpl> responses, List<Member> members)
{
try
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
index 8b0c638d63..9ad8b52c83 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
@@ -21,15 +21,12 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-public class WrappedListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
+public class WrappedListener<T extends AMQMethodBodyImpl> implements StateAwareMethodListener<T>
{
private final StateAwareMethodListener<T> _primary;
private final StateAwareMethodListener _post;
@@ -49,7 +46,7 @@ public class WrappedListener<T extends AMQMethodBody> implements StateAwareMetho
_post.methodReceived(stateMgr, evt);
}
- private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in)
+ private static <T extends AMQMethodBodyImpl> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in)
{
return in == null ? new NullListener<T>() : in;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
index 5ec3c9660a..47cec07546 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.cluster.handler;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.server.cluster.MethodHandlerFactory;
import org.apache.qpid.server.cluster.MethodHandlerRegistry;
import org.apache.qpid.server.state.AMQState;
@@ -66,12 +66,12 @@ public abstract class WrappingMethodHandlerFactory implements MethodHandlerFacto
return registry;
}
- private <A extends AMQMethodBody, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame)
+ private <A extends AMQMethodBodyImpl, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame)
{
r.addHandler(type, new WrappedListener<A>(r.getHandler(frame), _pre, _post));
}
- protected static class FrameDescriptor<A extends AMQMethodBody, B extends Class<A>>
+ protected static class FrameDescriptor<A extends AMQMethodBodyImpl, B extends Class<A>>
{
protected final A instance;
protected final B type;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
index 3664be58bc..22e95308d7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.server.cluster.replay;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
-abstract class ChainedMethodRecorder <T extends AMQMethodBody> implements MethodRecorder<T>
+abstract class ChainedMethodRecorder <T extends AMQMethodBodyImpl> implements MethodRecorder<T>
{
private final MethodRecorder<T> _recorder;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
index 5a433b869b..d3bf72b4fd 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.cluster.replay;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.AMQShortString;
@@ -48,7 +48,7 @@ class ConsumerCounts
return count == null ? 0 : count;
}
- synchronized void replay(List<AMQMethodBody> messages)
+ synchronized void replay(List<AMQMethodBodyImpl> messages)
{
for(AMQShortString queue : _counts.keySet())
{
@@ -72,7 +72,7 @@ class ConsumerCounts
}
}
- private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages)
+ private void replay(BasicConsumeBody msg, List<AMQMethodBodyImpl> messages)
{
int count = _counts.get(msg.queue);
for(int i = 0; i < count; i++)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java
index e45810438e..421da7e9ea 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.cluster.replay;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
/**
* Abstraction through which a method can be recorded for replay
*
*/
-interface MethodRecorder<T extends AMQMethodBody>
+interface MethodRecorder<T extends AMQMethodBodyImpl>
{
public void record(T method);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
index 4d3fe1dbed..4467be4052 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.cluster.replay;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
@@ -30,15 +28,8 @@ import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.server.cluster.MethodHandlerFactory;
-import org.apache.qpid.server.cluster.MethodHandlerRegistry;
import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
import java.util.Arrays;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java
index 898cb80cb3..799a88b265 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.server.cluster.replay;
-import org.apache.qpid.server.cluster.Sendable;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.util.List;
@@ -33,5 +31,5 @@ import java.util.List;
*/
public interface ReplayManager
{
- public List<AMQMethodBody> replay(boolean isLeader);
+ public List<AMQMethodBodyImpl> replay(boolean isLeader);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index d7bbb1c36b..5dfb02c7c5 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -26,10 +26,8 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.cluster.util.Bindings;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -48,8 +46,8 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
{
private static final Logger _logger = Logger.getLogger(ReplayStore.class);
- private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
- private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
+ private final Map<Class<? extends AMQMethodBodyImpl>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBodyImpl>, MethodRecorder>();
+ private final Map<Class<? extends AMQMethodBodyImpl>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBodyImpl>, MethodRecorder>();
private final Map<AMQShortString, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>();
private final Map<AMQShortString, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>();
private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _sharedBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>();
@@ -80,7 +78,7 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
VirtualHost virtualHost = session.getVirtualHost();
_logger.debug(new LogMessage("Replay store received {0}", evt.getMethod()));
- AMQMethodBody request = evt.getMethod();
+ AMQMethodBodyImpl request = evt.getMethod();
//allow any (relevant) recorder registered for this type of request to record it:
MethodRecorder recorder = getRecorders(session).get(request.getClass());
@@ -90,7 +88,7 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
}
}
- private Map<Class<? extends AMQMethodBody>, MethodRecorder> getRecorders(AMQProtocolSession session)
+ private Map<Class<? extends AMQMethodBodyImpl>, MethodRecorder> getRecorders(AMQProtocolSession session)
{
if (ClusteredProtocolSession.isPeerSession(session))
{
@@ -102,9 +100,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
}
}
- public List<AMQMethodBody> replay(boolean isLeader)
+ public List<AMQMethodBodyImpl> replay(boolean isLeader)
{
- List<AMQMethodBody> methods = new ArrayList<AMQMethodBody>();
+ List<AMQMethodBodyImpl> methods = new ArrayList<AMQMethodBodyImpl>();
methods.addAll(_exchanges.values());
methods.addAll(_privateQueues.values());
synchronized(_privateBindings)