diff options
Diffstat (limited to 'java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java')
-rw-r--r-- | java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index 4efe0b8dd0..5f083780d6 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -103,7 +103,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, private void ping(Broker b) throws AMQException { - ClusterPingBody ping = new ClusterPingBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0); ping.broker = _group.getLocal().getDetails(); ping.responseRequired = true; ping.load = _loadTable.getLocalLoad(); @@ -149,7 +151,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, Broker leader = connectToLeader(member); _logger.info(new LogMessage("Connected to {0}. joining", leader)); - ClusterJoinBody join = new ClusterJoinBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); join.broker = _group.getLocal().getDetails(); send(leader, new SimpleBodySendable(join)); } @@ -168,7 +172,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, public void leave() throws AMQException { - ClusterLeaveBody leave = new ClusterLeaveBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); leave.broker = _group.getLocal().getDetails(); send(getLeader(), new SimpleBodySendable(leave)); } @@ -189,7 +195,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, } else { - ClusterSuspectBody suspect = new ClusterSuspectBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); suspect.broker = broker.getDetails(); send(getLeader(), new SimpleBodySendable(suspect)); } @@ -211,7 +219,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, else { //pass request on to leader: - ClusterJoinBody request = new ClusterJoinBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); request.broker = member.getDetails(); Broker leader = getLeader(); send(leader, new SimpleBodySendable(request)); @@ -256,7 +266,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, private ClusterMembershipBody createAnnouncement(String membership) { - ClusterMembershipBody announce = new ClusterMembershipBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0); //TODO: revise this way of converting String to bytes... announce.members = membership.getBytes(); return announce; |