diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-29 11:10:09 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-29 11:10:09 +0000 |
commit | edba93f0542978803911b88647ce8e950f62491e (patch) | |
tree | 60e0905b0c508aa861064dc52cd1e74bb62b3cb6 | |
parent | ef4f924560116d10b57d66c311c33829492650ec (diff) | |
download | qpid-python-edba93f0542978803911b88647ce8e950f62491e.tar.gz |
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering protocol version
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@501009 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 44 insertions, 21 deletions
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index 8419ec5668..ee5aa48db9 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -172,12 +172,12 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements private boolean isMembershipAnnouncement(Object msg) { - return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody); + return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody); } private boolean isBufferable(Object msg) { - return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame); + return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame()); } private boolean isBuffereable(AMQBody body) diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index f9ec0eb878..2f473b63fb 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -110,6 +110,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0, + ClusterPingBody.getClazz((byte)8, (byte)0), + ClusterPingBody.getMethod((byte)8, (byte)0), _group.getLocal().getDetails(), _loadTable.getLocalLoad(), true); @@ -159,6 +161,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0, + ClusterJoinBody.getClazz((byte)8, (byte)0), + ClusterJoinBody.getMethod((byte)8, (byte)0), _group.getLocal().getDetails()); send(leader, new SimpleBodySendable(join)); @@ -182,6 +186,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0, + ClusterLeaveBody.getClazz((byte)8, (byte)0), + ClusterLeaveBody.getMethod((byte)8, (byte)0), _group.getLocal().getDetails()); send(getLeader(), new SimpleBodySendable(leave)); @@ -207,6 +213,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0, + ClusterSuspectBody.getClazz((byte)8, (byte)0), + ClusterSuspectBody.getMethod((byte)8, (byte)0), broker.getDetails()); send(getLeader(), new SimpleBodySendable(suspect)); @@ -231,7 +239,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, //pass request on to leader: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, member.getDetails()); + ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, + ClusterJoinBody.getClazz((byte)8, (byte)0), + ClusterJoinBody.getMethod((byte)8, (byte)0), + member.getDetails()); Broker leader = getLeader(); send(leader, new SimpleBodySendable(request)); @@ -278,7 +289,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, membership.getBytes()); + ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, + ClusterMembershipBody.getClazz((byte)8, (byte)0), + ClusterMembershipBody.getMethod((byte)8, (byte)0), + membership.getBytes()); return announce; diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index f447895013..401a54444b 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -200,10 +200,10 @@ public class MinaBrokerProxy extends Broker implements MethodHandler private void handleFrame(AMQFrame frame) throws AMQException { - AMQBody body = frame.bodyFrame; + AMQBody body = frame.getBodyFrame(); if (body instanceof AMQMethodBody) { - handleMethod(frame.channel, (AMQMethodBody) body); + handleMethod(frame.getChannel(), (AMQMethodBody) body); } else { diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index aa16595095..5a433b869b 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -56,6 +56,8 @@ class ConsumerCounts // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0, + BasicConsumeBody.getClazz((byte)8, (byte)0), + BasicConsumeBody.getMethod((byte)8, (byte)0), null, queue, false, diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index c0168b696b..4d3fe1dbed 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -50,13 +50,13 @@ public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory private final byte minor = (byte)0; private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] { - new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)), - new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor,false,false,false,null,0)), - new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor,null,null,false,null,null,0)), - new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor,null,false,false,null,false,false,false,0,null)), - new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor,null,false,false,0)), - new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)), - new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor,null,false)) + new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor), QueueDeclareBody.getMethod(major, minor),null,false,false,false,false,false,null,0)), + new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor), QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)), + new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor), QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)), + new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor), ExchangeDeclareBody.getMethod(major, minor),null,false,false,null,false,false,false,0,null)), + new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor), ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)), + new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor), BasicConsumeBody.getMethod(major, minor),null,null,false,false,false,false,null,0)), + new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor), BasicCancelBody.getMethod(major, minor),null,false)) }); diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index 8ac4b9b2c7..d7bbb1c36b 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -122,7 +122,7 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener _consumers.replay(methods); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - methods.add(new ClusterSynchBody((byte)8, (byte)0)); + methods.add(new ClusterSynchBody((byte)8, (byte)0, ClusterSynchBody.getClazz((byte)8, (byte)0), ClusterSynchBody.getMethod((byte)8, (byte)0))); return methods; } diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 19be638051..ecabd9320a 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -74,6 +74,8 @@ public class ClusteredQueue extends AMQQueue // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, + QueueDeleteBody.getClazz((byte)8,(byte)0), + QueueDeleteBody.getMethod((byte)8,(byte)0), false, false, false, @@ -94,6 +96,8 @@ public class ClusteredQueue extends AMQQueue // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0, + BasicCancelBody.getClazz((byte)8, (byte)0), + BasicCancelBody.getMethod((byte)8, (byte)0), getName(), false); diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 95ab34ccf9..f8e4311a77 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -54,7 +54,10 @@ public class PrivateQueue extends AMQQueue //send delete request to peers: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, false,false,false,null,0); + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, + QueueDeleteBody.getClazz((byte)8, (byte)0), + QueueDeleteBody.getMethod((byte)8, (byte)0), + false,false,false,null,0); request.queue = getName(); _groupMgr.broadcast(new SimpleBodySendable(request)); } diff --git a/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java index 20e092bde7..f1da312eea 100644 --- a/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java +++ b/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java @@ -54,7 +54,7 @@ public class BrokerTest extends TestCase for (RecordingBroker b : brokers) { - b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response")); + b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(), new TestMethod("response")); } assertTrue("Handler did not receive response", handler.isCompleted()); @@ -80,7 +80,7 @@ public class BrokerTest extends TestCase for (RecordingBroker broker : succeeded) { - broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response")); + broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).getChannel(), new TestMethod("response")); } b.remove(); @@ -106,7 +106,7 @@ public class BrokerTest extends TestCase for (int i = 0; i < msgs.length; i++) { assertTrue(sent.get(i) instanceof AMQFrame); - assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame); + assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame()); } } @@ -119,9 +119,9 @@ public class BrokerTest extends TestCase List<AMQDataBlock> sent = broker.getMessages(); assertEquals(1, sent.size()); assertTrue(sent.get(0) instanceof AMQFrame); - assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame()); - broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B")); + broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new TestMethod("B")); assertEquals(new TestMethod("B"), handler.getResponse()); } @@ -135,7 +135,7 @@ public class BrokerTest extends TestCase List<AMQDataBlock> sent = broker.getMessages(); assertEquals(1, sent.size()); assertTrue(sent.get(0) instanceof AMQFrame); - assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame()); broker.remove(); assertEquals(null, handler.getResponse()); assertTrue(handler.isCompleted()); |