diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-03-22 13:14:42 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-03-22 13:14:42 +0000 |
commit | 3fb9be28593263e12623ce09084a230b59b81f4f (patch) | |
tree | 8de74dd781802819df0ff1ca56aaa94fa1b9b38e /java/cluster/src/main | |
parent | b9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff) | |
download | qpid-python-3fb9be28593263e12623ce09084a230b59b81f4f.tar.gz |
made a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src/main')
33 files changed, 206 insertions, 147 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java new file mode 100644 index 0000000000..4d2737edce --- /dev/null +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java @@ -0,0 +1,32 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public class AMQConnectionWaitException extends AMQException
+{
+ public AMQConnectionWaitException(String s, Throwable e)
+ {
+ super(s, e);
+
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java new file mode 100644 index 0000000000..3681fac750 --- /dev/null +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java @@ -0,0 +1,33 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBodyImpl;
+
+public class AMQUnexpectedBodyTypeException extends AMQException
+{
+
+ public AMQUnexpectedBodyTypeException(Class<? extends AMQBodyImpl> expectedClass, AMQBodyImpl body)
+ {
+ super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java new file mode 100644 index 0000000000..721da24d53 --- /dev/null +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java @@ -0,0 +1,31 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public class AMQUnexpectedFrameTypeException extends AMQException
+{
+ public AMQUnexpectedFrameTypeException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java index 39508df566..aeded15eb8 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java @@ -20,26 +20,26 @@ */ package org.apache.qpid.server.cluster; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; public class BlockingHandler implements ResponseHandler { private final Class _expected; private boolean _completed; - private AMQMethodBody _response; + private AMQMethodBodyImpl _response; public BlockingHandler() { - this(AMQMethodBody.class); + this(AMQMethodBodyImpl.class); } - public BlockingHandler(Class<? extends AMQMethodBody> expected) + public BlockingHandler(Class<? extends AMQMethodBodyImpl> expected) { _expected = expected; } - public void responded(AMQMethodBody response) + public void responded(AMQMethodBodyImpl response) { if (_expected.isInstance(response)) { @@ -74,7 +74,7 @@ public class BlockingHandler implements ResponseHandler } } - AMQMethodBody getResponse() + AMQMethodBodyImpl getResponse() { return _response; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java index 7e2cf6da83..c560a1e20c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.cluster; import org.apache.qpid.AMQException; import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.log4j.Logger; import java.io.IOException; @@ -88,7 +88,7 @@ abstract class Broker extends SimpleMemberHandle implements Member * @param response the response received * @return true if the response matched an outstanding request */ - protected synchronized boolean handleResponse(int channel, AMQMethodBody response) + protected synchronized boolean handleResponse(int channel, AMQMethodBodyImpl response) { ResponseHandler request = _requests.get(channel); if (request == null) @@ -174,7 +174,7 @@ abstract class Broker extends SimpleMemberHandle implements Member /** * Start connection process, including replay */ - abstract void connectAsynch(Iterable<AMQMethodBody> msgs); + abstract void connectAsynch(Iterable<AMQMethodBodyImpl> msgs); /** * Replay messages to the remote peer this instance represents. These messages @@ -182,7 +182,7 @@ abstract class Broker extends SimpleMemberHandle implements Member * * @param msgs */ - abstract void replay(Iterable<AMQMethodBody> msgs); + abstract void replay(Iterable<AMQMethodBodyImpl> msgs); /** * establish connection, handling redirect if required... @@ -200,7 +200,7 @@ abstract class Broker extends SimpleMemberHandle implements Member this.channel = channel; } - public void responded(AMQMethodBody response) + public void responded(AMQMethodBodyImpl response) { request.responseReceived(Broker.this, response); _requests.remove(channel); @@ -228,7 +228,7 @@ abstract class Broker extends SimpleMemberHandle implements Member this.channel = channel; } - public void responded(AMQMethodBody response) + public void responded(AMQMethodBodyImpl response) { handler.responded(response); _requests.remove(channel); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java index 755a341607..03de4fbbb7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.cluster.replay.ReplayManager; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.cluster.util.InvokeMultiple; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import java.io.IOException; import java.util.ArrayList; @@ -225,7 +225,7 @@ class BrokerGroup if (create) { Broker b = _factory.create(handle); - List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local)); + List<AMQMethodBodyImpl> msgs = _replayMgr.replay(isLeader(_local)); _logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b)); b.connectAsynch(msgs); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java index 1b4a3e8327..e3377d3ed1 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java @@ -26,7 +26,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; /** * Hack to assist with reuse of the client handlers for connection setup in @@ -49,7 +49,7 @@ class ClientAdapter implements MethodHandler _stateMgr = stateMgr; } - public void handle(int channel, AMQMethodBody method) throws AMQException + public void handle(int channel, AMQMethodBodyImpl method) throws AMQException { AMQMethodEvent evt = new AMQMethodEvent(channel, method); _stateMgr.methodReceived(evt); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java index 5300912716..fb9e1fa70c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.cluster; -import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; -import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; -import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; -import org.apache.qpid.client.handler.ConnectionStartMethodHandler; -import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionOpenOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionStartMethodHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.IllegalStateTransitionException; @@ -78,14 +78,14 @@ public class ClientHandlerRegistry extends AMQStateManager return registry; } - protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) throws IllegalStateTransitionException + protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBodyImpl frame) throws IllegalStateTransitionException { ClientRegistry registry = _handlers.get(state); return registry == null ? null : registry.getHandler(frame); } - <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states) + <A extends Class<AMQMethodBodyImpl>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states) { for (AMQState state : states) { @@ -93,7 +93,7 @@ public class ClientHandlerRegistry extends AMQStateManager } } - <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state) + <A extends Class<AMQMethodBodyImpl>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state) { ClientRegistry registry = _handlers.get(state); if (registry == null) @@ -106,15 +106,15 @@ public class ClientHandlerRegistry extends AMQStateManager static class ClientRegistry { - private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry - = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>(); + private final Map<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener> registry + = new HashMap<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener>(); - <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler) + <A extends Class<AMQMethodBodyImpl>> void add(A type, StateAwareMethodListener handler) { registry.put(type, handler); } - StateAwareMethodListener getHandler(AMQMethodBody frame) + StateAwareMethodListener getHandler(AMQMethodBodyImpl frame) { return registry.get(frame.getClass()); } @@ -122,9 +122,9 @@ public class ClientHandlerRegistry extends AMQStateManager class ConnectionTuneHandler extends ConnectionTuneMethodHandler { - protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor) + protected AMQFrame createConnectionOpenBody(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor) { - return super.createConnectionOpenFrame(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist, major, minor); + return super.createConnectionOpenBody(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist, major, minor); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index ee5aa48db9..4e0a367f40 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -24,17 +24,14 @@ import org.apache.log4j.Logger; import org.apache.mina.common.IoSession; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.ClusterMembershipBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.cluster.util.LogMessage; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index 2f473b63fb..149378a626 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -230,7 +230,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, //connect to the host and port specified: Broker prospect = connectToProspect(member); announceMembership(); - List<AMQMethodBody> msgs = _replayMgr.replay(true); + List<AMQMethodBodyImpl> msgs = _replayMgr.replay(true); _logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect)); prospect.replay(msgs); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java index 8ab7856e87..55b51bf736 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.cluster; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import java.util.ArrayList; import java.util.HashMap; @@ -35,7 +35,7 @@ import java.util.Map; */ class GroupRequest { - private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>(); + private final Map<Member, AMQMethodBodyImpl> _responses = new HashMap<Member, AMQMethodBodyImpl>(); private final List<Member> _brokers = new ArrayList<Member>(); private boolean _sent; @@ -62,7 +62,7 @@ class GroupRequest return checkCompletion(); } - public boolean responseReceived(Member broker, AMQMethodBody response) + public boolean responseReceived(Member broker, AMQMethodBodyImpl response) { _responses.put(broker, response); return checkCompletion(); @@ -90,9 +90,9 @@ class GroupRequest return true; } - List<AMQMethodBody> getResults() + List<AMQMethodBodyImpl> getResults() { - List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size()); + List<AMQMethodBodyImpl> results = new ArrayList<AMQMethodBodyImpl>(_brokers.size()); for (Member b : _brokers) { results.add(_responses.get(b)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java index d2e9de2f39..d394651c26 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.server.cluster; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import java.util.List; public interface GroupResponseHandler { //Note: this implies that the response to a group request will always be a method body... - public void response(List<AMQMethodBody> responses, List<Member> members); + public void response(List<AMQMethodBodyImpl> responses, List<Member> members); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java index a83f034021..d3f2b9201c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java @@ -21,9 +21,9 @@ package org.apache.qpid.server.cluster; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; interface MethodHandler { - public void handle(int channel, AMQMethodBody method) throws AMQException; + public void handle(int channel, AMQMethodBodyImpl method) throws AMQException; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java index 748a660bb8..33ec771d6a 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.cluster; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.server.state.StateAwareMethodListener; import java.util.HashMap; @@ -28,16 +28,16 @@ import java.util.Map; public class MethodHandlerRegistry { - private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry = - new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>(); + private final Map<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener<? extends AMQMethodBodyImpl>> registry = + new HashMap<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener<? extends AMQMethodBodyImpl>>(); - public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler) + public <A extends AMQMethodBodyImpl, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler) { registry.put(type, handler); return this; } - public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame) + public <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> getHandler(B frame) { return (StateAwareMethodListener<B>) registry.get(frame.getClass()); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index 401a54444b..1f736ad94c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -33,13 +33,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQBodyImpl; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.ConnectionRedirectBody; import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import java.io.IOException; import java.net.InetSocketAddress; @@ -57,7 +57,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler private final MemberHandle _local; private IoSession _session; private MethodHandler _handler; - private Iterable<AMQMethodBody> _replay; + private Iterable<AMQMethodBodyImpl> _replay; MinaBrokerProxy(String host, int port, MemberHandle local) { @@ -106,13 +106,13 @@ public class MinaBrokerProxy extends Broker implements MethodHandler return _connectionMonitor.waitUntilOpen(); } - void connectAsynch(Iterable<AMQMethodBody> msgs) + void connectAsynch(Iterable<AMQMethodBodyImpl> msgs) { _replay = msgs; connectImpl(); } - void replay(Iterable<AMQMethodBody> msgs) + void replay(Iterable<AMQMethodBodyImpl> msgs) { _replay = msgs; if(_connectionMonitor.isOpened()) @@ -138,7 +138,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } } - public void send(AMQDataBlock data) throws AMQException + public void send(AMQDataBlock data) throws AMQConnectionWaitException { if (_session == null) { @@ -146,9 +146,9 @@ public class MinaBrokerProxy extends Broker implements MethodHandler { _connectionMonitor.waitUntilOpen(); } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQException("Failed to send " + data + ": " + e, e); + throw new AMQConnectionWaitException("Failed to send " + data + ": " + e, e); } } _session.write(data); @@ -158,14 +158,14 @@ public class MinaBrokerProxy extends Broker implements MethodHandler { if(_replay != null) { - for(AMQMethodBody b : _replay) + for(AMQMethodBodyImpl b : _replay) { _session.write(new AMQFrame(0, b)); } } } - public void handle(int channel, AMQMethodBody method) throws AMQException + public void handle(int channel, AMQMethodBodyImpl method) throws AMQException { _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel)); if (!handleResponse(channel, method)) @@ -174,7 +174,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } } - private void handleMethod(int channel, AMQMethodBody method) throws AMQException + private void handleMethod(int channel, AMQMethodBodyImpl method) throws AMQException { if (method instanceof ConnectionRedirectBody) { @@ -200,14 +200,14 @@ public class MinaBrokerProxy extends Broker implements MethodHandler private void handleFrame(AMQFrame frame) throws AMQException { - AMQBody body = frame.getBodyFrame(); - if (body instanceof AMQMethodBody) + AMQBodyImpl body = frame.getBodyFrame(); + if (body instanceof AMQMethodBodyImpl) { - handleMethod(frame.getChannel(), (AMQMethodBody) body); + handleMethod(frame.getChannel(), (AMQMethodBodyImpl) body); } else { - throw new AMQException("Client only expects method body, got: " + body); + throw new AMQUnexpectedBodyTypeException(AMQMethodBodyImpl.class, body); } } @@ -216,7 +216,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]"; } - private class MinaBinding extends IoHandlerAdapter implements ProtocolVersionList + private class MinaBinding extends IoHandlerAdapter { public void sessionCreated(IoSession session) throws Exception { @@ -228,8 +228,8 @@ public class MinaBrokerProxy extends Broker implements MethodHandler /* Find last protocol version in protocol version list. Make sure last protocol version listed in the build file (build-module.xml) is the latest version which will be used here. */ - int i = pv.length - 1; - session.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + + session.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); } public void sessionOpened(IoSession session) throws Exception @@ -260,7 +260,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } else { - throw new AMQException("Received message of unrecognised type: " + object); + throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java index fe76ca6505..81341eb445 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.cluster; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; public interface ResponseHandler { - public void responded(AMQMethodBody response); + public void responded(AMQMethodBodyImpl response); public void removed(); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index 03b0dc7f2e..ac0373cc0d 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -21,14 +21,12 @@ package org.apache.qpid.server.cluster; import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.IllegalStateTransitionException; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -74,7 +72,7 @@ class ServerHandlerRegistry extends AMQStateManager } } - protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException + protected <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException { MethodHandlerRegistry registry = _handlers.get(state); StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame); @@ -85,7 +83,7 @@ class ServerHandlerRegistry extends AMQStateManager return handler; } - <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler) + <A extends AMQMethodBodyImpl, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler) { MethodHandlerRegistry registry = _handlers.get(state); if (registry == null) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java index f7c40c60b3..bae930b341 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java @@ -18,16 +18,16 @@ package org.apache.qpid.server.cluster; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQBodyImpl; import org.apache.qpid.framing.AMQFrame; /** */ public class SimpleBodySendable implements Sendable { - private final AMQBody _body; + private final AMQBodyImpl _body; - public SimpleBodySendable(AMQBody body) + public SimpleBodySendable(AMQBodyImpl body) { _body = body; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java index 86710e8a31..85955ab775 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java @@ -21,18 +21,14 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import java.util.List; import java.util.ArrayList; -public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A> +public class ChainedClusterMethodHandler <A extends AMQMethodBodyImpl> extends ClusterMethodHandler<A> { private final List<ClusterMethodHandler<A>> _handlers; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java index faab99b0f6..5c7e1d7ff7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java @@ -20,17 +20,15 @@ */ package org.apache.qpid.server.cluster.handler; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.AMQException; -public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> +public abstract class ClusterMethodHandler<A extends AMQMethodBodyImpl> implements StateAwareMethodListener<A> { public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java index e7509da32a..313ba6d304 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -222,17 +222,17 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory } } - private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler) + private <B extends AMQMethodBodyImpl> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler) { return new ReplicatingHandler<B>(_groupMgr, handler); } - private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client) + private <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client) { return new PeerHandler<B>(peer, client); } - private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h) + private <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h) { return new ChainedClusterMethodHandler<B>(h); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java index a2f62f714b..a9cb096d33 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java @@ -21,15 +21,12 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> +class ExtendedHandler<A extends AMQMethodBodyImpl> implements StateAwareMethodListener<A> { private final StateAwareMethodListener<A> _base; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java index 8b0bb4b127..a0b95344ee 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java @@ -21,15 +21,12 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> +public class NullListener<T extends AMQMethodBodyImpl> implements StateAwareMethodListener<T> { public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java index 447e51ccd9..50f25bd7fe 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java @@ -21,12 +21,8 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -36,7 +32,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener; * application). * */ -public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> +public class PeerHandler<A extends AMQMethodBodyImpl> extends ClusterMethodHandler<A> { private final StateAwareMethodListener<A> _peer; private final StateAwareMethodListener<A> _client; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java index 888fa4e426..f7b9eb2b21 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.cluster.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.cluster.*; import org.apache.qpid.server.cluster.policy.StandardPolicies; @@ -41,7 +41,7 @@ import java.util.List; * processed locally after 'completion' of this broadcast. * */ -class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> implements StandardPolicies +class ReplicatingHandler<A extends AMQMethodBodyImpl> extends ClusterMethodHandler<A> implements StandardPolicies { protected static final Logger _logger = Logger.getLogger(ReplicatingHandler.class); @@ -109,7 +109,7 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A _evt = evt; } - public void response(List<AMQMethodBody> responses, List<Member> members) + public void response(List<AMQMethodBodyImpl> responses, List<Member> members) { try { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java index 8b0c638d63..9ad8b52c83 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java @@ -21,15 +21,12 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -public class WrappedListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> +public class WrappedListener<T extends AMQMethodBodyImpl> implements StateAwareMethodListener<T> { private final StateAwareMethodListener<T> _primary; private final StateAwareMethodListener _post; @@ -49,7 +46,7 @@ public class WrappedListener<T extends AMQMethodBody> implements StateAwareMetho _post.methodReceived(stateMgr, evt); } - private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in) + private static <T extends AMQMethodBodyImpl> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in) { return in == null ? new NullListener<T>() : in; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java index 5ec3c9660a..47cec07546 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.cluster.handler; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.server.cluster.MethodHandlerFactory; import org.apache.qpid.server.cluster.MethodHandlerRegistry; import org.apache.qpid.server.state.AMQState; @@ -66,12 +66,12 @@ public abstract class WrappingMethodHandlerFactory implements MethodHandlerFacto return registry; } - private <A extends AMQMethodBody, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame) + private <A extends AMQMethodBodyImpl, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame) { r.addHandler(type, new WrappedListener<A>(r.getHandler(frame), _pre, _post)); } - protected static class FrameDescriptor<A extends AMQMethodBody, B extends Class<A>> + protected static class FrameDescriptor<A extends AMQMethodBodyImpl, B extends Class<A>> { protected final A instance; protected final B type; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java index 3664be58bc..22e95308d7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.server.cluster.replay; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; -abstract class ChainedMethodRecorder <T extends AMQMethodBody> implements MethodRecorder<T> +abstract class ChainedMethodRecorder <T extends AMQMethodBodyImpl> implements MethodRecorder<T> { private final MethodRecorder<T> _recorder; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 5a433b869b..d3bf72b4fd 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.cluster.replay; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.AMQShortString; @@ -48,7 +48,7 @@ class ConsumerCounts return count == null ? 0 : count; } - synchronized void replay(List<AMQMethodBody> messages) + synchronized void replay(List<AMQMethodBodyImpl> messages) { for(AMQShortString queue : _counts.keySet()) { @@ -72,7 +72,7 @@ class ConsumerCounts } } - private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages) + private void replay(BasicConsumeBody msg, List<AMQMethodBodyImpl> messages) { int count = _counts.get(msg.queue); for(int i = 0; i < count; i++) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java index e45810438e..421da7e9ea 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.cluster.replay; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; /** * Abstraction through which a method can be recorded for replay * */ -interface MethodRecorder<T extends AMQMethodBody> +interface MethodRecorder<T extends AMQMethodBodyImpl> { public void record(T method); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index 4d3fe1dbed..4467be4052 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.cluster.replay; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.BasicCancelBody; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.ExchangeDeclareBody; @@ -30,15 +28,8 @@ import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.server.cluster.MethodHandlerFactory; -import org.apache.qpid.server.cluster.MethodHandlerRegistry; import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.state.StateAwareMethodListener; import java.util.Arrays; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java index 898cb80cb3..799a88b265 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.server.cluster.replay; -import org.apache.qpid.server.cluster.Sendable; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyImpl; import java.util.List; @@ -33,5 +31,5 @@ import java.util.List; */ public interface ReplayManager { - public List<AMQMethodBody> replay(boolean isLeader); + public List<AMQMethodBodyImpl> replay(boolean isLeader); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index d7bbb1c36b..5dfb02c7c5 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -26,10 +26,8 @@ import org.apache.qpid.framing.*; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.cluster.util.Bindings; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -48,8 +46,8 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener { private static final Logger _logger = Logger.getLogger(ReplayStore.class); - private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); - private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); + private final Map<Class<? extends AMQMethodBodyImpl>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBodyImpl>, MethodRecorder>(); + private final Map<Class<? extends AMQMethodBodyImpl>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBodyImpl>, MethodRecorder>(); private final Map<AMQShortString, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>(); private final Map<AMQShortString, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>(); private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _sharedBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>(); @@ -80,7 +78,7 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener VirtualHost virtualHost = session.getVirtualHost(); _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod())); - AMQMethodBody request = evt.getMethod(); + AMQMethodBodyImpl request = evt.getMethod(); //allow any (relevant) recorder registered for this type of request to record it: MethodRecorder recorder = getRecorders(session).get(request.getClass()); @@ -90,7 +88,7 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener } } - private Map<Class<? extends AMQMethodBody>, MethodRecorder> getRecorders(AMQProtocolSession session) + private Map<Class<? extends AMQMethodBodyImpl>, MethodRecorder> getRecorders(AMQProtocolSession session) { if (ClusteredProtocolSession.isPeerSession(session)) { @@ -102,9 +100,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener } } - public List<AMQMethodBody> replay(boolean isLeader) + public List<AMQMethodBodyImpl> replay(boolean isLeader) { - List<AMQMethodBody> methods = new ArrayList<AMQMethodBody>(); + List<AMQMethodBodyImpl> methods = new ArrayList<AMQMethodBodyImpl>(); methods.addAll(_exchanges.values()); methods.addAll(_privateQueues.values()); synchronized(_privateBindings) |