summaryrefslogtreecommitdiff
path: root/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java24
1 files changed, 18 insertions, 6 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index 4efe0b8dd0..5f083780d6 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -103,7 +103,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
private void ping(Broker b) throws AMQException
{
- ClusterPingBody ping = new ClusterPingBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0);
ping.broker = _group.getLocal().getDetails();
ping.responseRequired = true;
ping.load = _loadTable.getLocalLoad();
@@ -149,7 +151,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
Broker leader = connectToLeader(member);
_logger.info(new LogMessage("Connected to {0}. joining", leader));
- ClusterJoinBody join = new ClusterJoinBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0);
join.broker = _group.getLocal().getDetails();
send(leader, new SimpleBodySendable(join));
}
@@ -168,7 +172,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
public void leave() throws AMQException
{
- ClusterLeaveBody leave = new ClusterLeaveBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0);
leave.broker = _group.getLocal().getDetails();
send(getLeader(), new SimpleBodySendable(leave));
}
@@ -189,7 +195,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
}
else
{
- ClusterSuspectBody suspect = new ClusterSuspectBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0);
suspect.broker = broker.getDetails();
send(getLeader(), new SimpleBodySendable(suspect));
}
@@ -211,7 +219,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
else
{
//pass request on to leader:
- ClusterJoinBody request = new ClusterJoinBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0);
request.broker = member.getDetails();
Broker leader = getLeader();
send(leader, new SimpleBodySendable(request));
@@ -256,7 +266,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
private ClusterMembershipBody createAnnouncement(String membership)
{
- ClusterMembershipBody announce = new ClusterMembershipBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0);
//TODO: revise this way of converting String to bytes...
announce.members = membership.getBytes();
return announce;