summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrian Nixon <nixon@fb.com>2019-07-15 14:15:03 +0200
committerMate Szalay-Beko <mszalay@cloudera.com>2022-05-17 09:43:49 +0200
commitf68019ab9c2e0ee362b890e1a7a0a449afda3a7c (patch)
treef108784b046477ef215ff52d57a4277dfa9f8e8e
parentd1ec2f346b43ce6c00be45a68871fb7bda6e098a (diff)
downloadzookeeper-f68019ab9c2e0ee362b890e1a7a0a449afda3a7c.tar.gz
ZOOKEEPER-3459: Add admin command to display synced state of peer
Author: Brian Nixon <nixon@fb.com> Reviewers: Enrico Olivelli <eolivelli@apache.org>, Norbert Kalmar <nkalmar@apache.org> Closes #1012 from enixon/cmd-sync-state (cherry picked from commit cc900a3b05bc31a237753680c8b00dc5866df4b2)
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java40
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java15
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java3
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java10
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java5
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java50
6 files changed, 110 insertions, 13 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 7f338091c..fc10f9da8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -37,9 +37,8 @@ import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.persistence.SnapshotInfo;
-import org.apache.zookeeper.server.quorum.Leader;
-import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
-import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.server.quorum.*;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.OSMXBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,6 +129,7 @@ public class Commands {
registerCommand(new WatchCommand());
registerCommand(new WatchesByPathCommand());
registerCommand(new WatchSummaryCommand());
+ registerCommand(new ZabStateCommand());
}
/**
@@ -576,5 +576,39 @@ public class Commands {
}
}
+
+ /**
+ * Returns the current phase of Zab protocol that peer is running.
+ * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
+ */
+ public static class ZabStateCommand extends CommandBase {
+ public ZabStateCommand() {
+ super(Arrays.asList("zabstate"));
+ }
+
+ @Override
+ public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+ CommandResponse response = initializeResponse();
+ if (zkServer instanceof QuorumZooKeeperServer) {
+ QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
+ QuorumPeer.ZabState zabState = peer.getZabState();
+ QuorumVerifier qv = peer.getQuorumVerifier();
+
+ QuorumPeer.QuorumServer voter = qv.getVotingMembers().get(peer.getId());
+ boolean voting = (
+ voter != null
+ && voter.addr.equals(peer.getQuorumAddress())
+ && voter.electionAddr.equals(peer.getElectionAddress())
+ );
+ response.put("voting", voting);
+ response.put("zabstate", zabState.name().toLowerCase());
+ } else {
+ response.put("voting", false);
+ response.put("zabstate", "");
+ }
+ return response ;
+ }
+ }
+
private Commands() {}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index b79f5702c..80b41ef5c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -40,7 +40,7 @@ public class Follower extends Learner{
private long lastQueued;
// This is the same object as this.zk, but we cache the downcast op
final FollowerZooKeeperServer fzk;
-
+
Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
this.self = self;
this.zk=zk;
@@ -72,7 +72,8 @@ public class Follower extends Learner{
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
- QuorumServer leaderServer = findLeader();
+ self.setZabState(QuorumPeer.ZabState.DISCOVERY);
+ QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
@@ -86,7 +87,9 @@ public class Follower extends Learner{
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
- syncWithLeader(newEpochZxid);
+ self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
+ syncWithLeader(newEpochZxid);
+ self.setZabState(QuorumPeer.ZabState.BROADCAST);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
@@ -114,7 +117,7 @@ public class Follower extends Learner{
case Leader.PING:
ping(qp);
break;
- case Leader.PROPOSAL:
+ case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
@@ -146,9 +149,9 @@ public class Follower extends Learner{
// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
- boolean majorChange =
+ boolean majorChange =
self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
- // commit (writes the new config to ZK tree (/zookeeper/config)
+ // commit (writes the new config to ZK tree (/zookeeper/config)
fzk.commit(qp.getZxid());
if (majorChange) {
throw new Exception("changes proposed in reconfig");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index b3c4b6c5d..11039bfd9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -469,6 +469,7 @@ public class Leader {
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
+ self.setZabState(QuorumPeer.ZabState.DISCOVERY);
self.tick.set(0);
zk.loadData();
@@ -539,6 +540,7 @@ public class Leader {
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
+ self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
@@ -590,6 +592,7 @@ public class Leader {
self.setZooKeeperServer(zk);
}
+ self.setZabState(QuorumPeer.ZabState.BROADCAST);
self.adminServer.setZooKeeperServer(zk);
// Everything is a go, simply start counting the ticks
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index a8d89b28d..74f9f4ff1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -235,7 +235,7 @@ public class Learner {
/**
* Establish a connection with the Leader found by findLeader. Retries
- * until either initLimit time has elapsed or 5 tries have happened.
+ * until either initLimit time has elapsed or 5 tries have happened.
* @param addr - the address of the Leader to connect to.
* @throws IOException - if the socket connection fails on the 5th attempt
* <li>if there is an authentication failure while connecting to leader</li>
@@ -309,7 +309,7 @@ public class Learner {
/**
* Once connected to the leader, perform the handshake protocol to
- * establish a following / observing connection.
+ * establish a following / observing connection.
* @param pktType
* @return the zxid the Leader sends for synchronization purposes.
* @throws IOException
@@ -368,7 +368,7 @@ public class Learner {
}
/**
- * Finally, synchronize our history with the Leader.
+ * Finally, synchronize our history with the Leader.
* @param newLeaderZxid
* @throws IOException
* @throws InterruptedException
@@ -390,6 +390,7 @@ public class Learner {
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
+ self.setSyncMode(QuorumPeer.SyncMode.DIFF);
if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
snapshotNeeded = true;
@@ -399,6 +400,7 @@ public class Learner {
}
}
else if (qp.getType() == Leader.SNAP) {
+ self.setSyncMode(QuorumPeer.SyncMode.SNAP);
LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
// db is clear as part of deserializeSnapshot()
@@ -421,6 +423,7 @@ public class Learner {
syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
+ self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
@@ -591,6 +594,7 @@ public class Learner {
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startServing();
/*
* Update the election vote here to ensure that all members of the
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index 050582d62..6e1d9c150 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -63,6 +63,7 @@ public class Observer extends Learner{
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
try {
+ self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer leaderServer = findLeader();
LOG.info("Observing " + leaderServer.addr);
try {
@@ -70,8 +71,10 @@ public class Observer extends Learner{
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
-
+
+ self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newLeaderZxid);
+ self.setZabState(QuorumPeer.ZabState.BROADCAST);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 474f70c44..daf605cab 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -400,6 +400,22 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
LOOKING, FOLLOWING, LEADING, OBSERVING;
}
+ /**
+ * (Used for monitoring) shows the current phase of
+ * Zab protocol that peer is running.
+ */
+ public enum ZabState {
+ ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST;
+ }
+
+ /**
+ * (Used for monitoring) When peer is in synchronization phase, this shows
+ * which synchronization mechanism is being used
+ */
+ public enum SyncMode {
+ NONE, DIFF, SNAP, TRUNC;
+ }
+
/*
* A peer can either be participating, which implies that it is willing to
* both vote in instances of consensus and to elect or become a Leader, or
@@ -715,11 +731,45 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
private ServerState state = ServerState.LOOKING;
+ private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION);
+ private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE);
+
private boolean reconfigFlag = false; // indicates that a reconfig just committed
public synchronized void setPeerState(ServerState newState){
state=newState;
}
+
+ public void setZabState(ZabState zabState) {
+ this.zabState.set(zabState);
+ LOG.info("Peer state changed: {}", getDetailedPeerState());
+ }
+
+ public void setSyncMode(SyncMode syncMode) {
+ this.syncMode.set(syncMode);
+ LOG.info("Peer state changed: {}", getDetailedPeerState());
+ }
+
+ public ZabState getZabState() {
+ return zabState.get();
+ }
+
+ public SyncMode getSyncMode() {
+ return syncMode.get();
+ }
+ public String getDetailedPeerState() {
+ final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase());
+ final ZabState zabState = getZabState();
+ if (!ZabState.ELECTION.equals(zabState)) {
+ sb.append(" - ").append(zabState.toString().toLowerCase());
+ }
+ final SyncMode syncMode = getSyncMode();
+ if (!SyncMode.NONE.equals(syncMode)) {
+ sb.append(" - ").append(syncMode.toString().toLowerCase());
+ }
+ return sb.toString();
+ }
+
public synchronized void reconfigFlagSet(){
reconfigFlag = true;
}