summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-29 11:10:09 +0000
committerRobert Greig <rgreig@apache.org>2007-01-29 11:10:09 +0000
commitedba93f0542978803911b88647ce8e950f62491e (patch)
tree60e0905b0c508aa861064dc52cd1e74bb62b3cb6
parentef4f924560116d10b57d66c311c33829492650ec (diff)
downloadqpid-python-edba93f0542978803911b88647ce8e950f62491e.tar.gz
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering protocol version
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@501009 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java4
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java18
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java4
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java2
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java14
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java2
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java4
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java5
-rw-r--r--qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java12
9 files changed, 44 insertions, 21 deletions
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index 8419ec5668..ee5aa48db9 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -172,12 +172,12 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
private boolean isMembershipAnnouncement(Object msg)
{
- return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody);
+ return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody);
}
private boolean isBufferable(Object msg)
{
- return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame);
+ return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame());
}
private boolean isBuffereable(AMQBody body)
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index f9ec0eb878..2f473b63fb 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -110,6 +110,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterPingBody ping = new ClusterPingBody((byte)8,
(byte)0,
+ ClusterPingBody.getClazz((byte)8, (byte)0),
+ ClusterPingBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails(),
_loadTable.getLocalLoad(),
true);
@@ -159,6 +161,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterJoinBody join = new ClusterJoinBody((byte)8,
(byte)0,
+ ClusterJoinBody.getClazz((byte)8, (byte)0),
+ ClusterJoinBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails());
send(leader, new SimpleBodySendable(join));
@@ -182,6 +186,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterLeaveBody leave = new ClusterLeaveBody((byte)8,
(byte)0,
+ ClusterLeaveBody.getClazz((byte)8, (byte)0),
+ ClusterLeaveBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails());
send(getLeader(), new SimpleBodySendable(leave));
@@ -207,6 +213,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8,
(byte)0,
+ ClusterSuspectBody.getClazz((byte)8, (byte)0),
+ ClusterSuspectBody.getMethod((byte)8, (byte)0),
broker.getDetails());
send(getLeader(), new SimpleBodySendable(suspect));
@@ -231,7 +239,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
//pass request on to leader:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, member.getDetails());
+ ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0,
+ ClusterJoinBody.getClazz((byte)8, (byte)0),
+ ClusterJoinBody.getMethod((byte)8, (byte)0),
+ member.getDetails());
Broker leader = getLeader();
send(leader, new SimpleBodySendable(request));
@@ -278,7 +289,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, membership.getBytes());
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0,
+ ClusterMembershipBody.getClazz((byte)8, (byte)0),
+ ClusterMembershipBody.getMethod((byte)8, (byte)0),
+ membership.getBytes());
return announce;
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index f447895013..401a54444b 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -200,10 +200,10 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
private void handleFrame(AMQFrame frame) throws AMQException
{
- AMQBody body = frame.bodyFrame;
+ AMQBody body = frame.getBodyFrame();
if (body instanceof AMQMethodBody)
{
- handleMethod(frame.channel, (AMQMethodBody) body);
+ handleMethod(frame.getChannel(), (AMQMethodBody) body);
}
else
{
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
index aa16595095..5a433b869b 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
@@ -56,6 +56,8 @@ class ConsumerCounts
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
BasicConsumeBody m = new BasicConsumeBody((byte)8,
(byte)0,
+ BasicConsumeBody.getClazz((byte)8, (byte)0),
+ BasicConsumeBody.getMethod((byte)8, (byte)0),
null,
queue,
false,
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
index c0168b696b..4d3fe1dbed 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
@@ -50,13 +50,13 @@ public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory
private final byte minor = (byte)0;
private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
{
- new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)),
- new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor,false,false,false,null,0)),
- new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor,null,null,false,null,null,0)),
- new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor,null,false,false,null,false,false,false,0,null)),
- new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor,null,false,false,0)),
- new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)),
- new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor,null,false))
+ new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor), QueueDeclareBody.getMethod(major, minor),null,false,false,false,false,false,null,0)),
+ new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor), QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)),
+ new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor), QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)),
+ new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor), ExchangeDeclareBody.getMethod(major, minor),null,false,false,null,false,false,false,0,null)),
+ new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor), ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)),
+ new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor), BasicConsumeBody.getMethod(major, minor),null,null,false,false,false,false,null,0)),
+ new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor), BasicCancelBody.getMethod(major, minor),null,false))
});
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index 8ac4b9b2c7..d7bbb1c36b 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -122,7 +122,7 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
_consumers.replay(methods);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- methods.add(new ClusterSynchBody((byte)8, (byte)0));
+ methods.add(new ClusterSynchBody((byte)8, (byte)0, ClusterSynchBody.getClazz((byte)8, (byte)0), ClusterSynchBody.getMethod((byte)8, (byte)0)));
return methods;
}
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index 19be638051..ecabd9320a 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -74,6 +74,8 @@ public class ClusteredQueue extends AMQQueue
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
QueueDeleteBody request = new QueueDeleteBody((byte)8,
(byte)0,
+ QueueDeleteBody.getClazz((byte)8,(byte)0),
+ QueueDeleteBody.getMethod((byte)8,(byte)0),
false,
false,
false,
@@ -94,6 +96,8 @@ public class ClusteredQueue extends AMQQueue
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
BasicCancelBody request = new BasicCancelBody((byte)8,
(byte)0,
+ BasicCancelBody.getClazz((byte)8, (byte)0),
+ BasicCancelBody.getMethod((byte)8, (byte)0),
getName(),
false);
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index 95ab34ccf9..f8e4311a77 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
@@ -54,7 +54,10 @@ public class PrivateQueue extends AMQQueue
//send delete request to peers:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, false,false,false,null,0);
+ QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0,
+ QueueDeleteBody.getClazz((byte)8, (byte)0),
+ QueueDeleteBody.getMethod((byte)8, (byte)0),
+ false,false,false,null,0);
request.queue = getName();
_groupMgr.broadcast(new SimpleBodySendable(request));
}
diff --git a/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
index 20e092bde7..f1da312eea 100644
--- a/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
+++ b/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
@@ -54,7 +54,7 @@ public class BrokerTest extends TestCase
for (RecordingBroker b : brokers)
{
- b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new TestMethod("response"));
+ b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(), new TestMethod("response"));
}
assertTrue("Handler did not receive response", handler.isCompleted());
@@ -80,7 +80,7 @@ public class BrokerTest extends TestCase
for (RecordingBroker broker : succeeded)
{
- broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).channel, new TestMethod("response"));
+ broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).getChannel(), new TestMethod("response"));
}
b.remove();
@@ -106,7 +106,7 @@ public class BrokerTest extends TestCase
for (int i = 0; i < msgs.length; i++)
{
assertTrue(sent.get(i) instanceof AMQFrame);
- assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+ assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame());
}
}
@@ -119,9 +119,9 @@ public class BrokerTest extends TestCase
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(1, sent.size());
assertTrue(sent.get(0) instanceof AMQFrame);
- assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame());
- broker.handleResponse(((AMQFrame) sent.get(0)).channel, new TestMethod("B"));
+ broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new TestMethod("B"));
assertEquals(new TestMethod("B"), handler.getResponse());
}
@@ -135,7 +135,7 @@ public class BrokerTest extends TestCase
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(1, sent.size());
assertTrue(sent.get(0) instanceof AMQFrame);
- assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame());
broker.remove();
assertEquals(null, handler.getResponse());
assertTrue(handler.isCompleted());