diff options
author | Brian Nixon <nixon@fb.com> | 2019-07-15 14:15:03 +0200 |
---|---|---|
committer | Mate Szalay-Beko <mszalay@cloudera.com> | 2022-05-17 09:43:49 +0200 |
commit | f68019ab9c2e0ee362b890e1a7a0a449afda3a7c (patch) | |
tree | f108784b046477ef215ff52d57a4277dfa9f8e8e | |
parent | d1ec2f346b43ce6c00be45a68871fb7bda6e098a (diff) | |
download | zookeeper-f68019ab9c2e0ee362b890e1a7a0a449afda3a7c.tar.gz |
ZOOKEEPER-3459: Add admin command to display synced state of peer
Author: Brian Nixon <nixon@fb.com>
Reviewers: Enrico Olivelli <eolivelli@apache.org>, Norbert Kalmar <nkalmar@apache.org>
Closes #1012 from enixon/cmd-sync-state
(cherry picked from commit cc900a3b05bc31a237753680c8b00dc5866df4b2)
6 files changed, 110 insertions, 13 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index 7f338091c..fc10f9da8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -37,9 +37,8 @@ import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.persistence.SnapshotInfo; -import org.apache.zookeeper.server.quorum.Leader; -import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer; -import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; +import org.apache.zookeeper.server.quorum.*; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.OSMXBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,6 +129,7 @@ public class Commands { registerCommand(new WatchCommand()); registerCommand(new WatchesByPathCommand()); registerCommand(new WatchSummaryCommand()); + registerCommand(new ZabStateCommand()); } /** @@ -576,5 +576,39 @@ public class Commands { } } + + /** + * Returns the current phase of Zab protocol that peer is running. + * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST + */ + public static class ZabStateCommand extends CommandBase { + public ZabStateCommand() { + super(Arrays.asList("zabstate")); + } + + @Override + public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + CommandResponse response = initializeResponse(); + if (zkServer instanceof QuorumZooKeeperServer) { + QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self; + QuorumPeer.ZabState zabState = peer.getZabState(); + QuorumVerifier qv = peer.getQuorumVerifier(); + + QuorumPeer.QuorumServer voter = qv.getVotingMembers().get(peer.getId()); + boolean voting = ( + voter != null + && voter.addr.equals(peer.getQuorumAddress()) + && voter.electionAddr.equals(peer.getElectionAddress()) + ); + response.put("voting", voting); + response.put("zabstate", zabState.name().toLowerCase()); + } else { + response.put("voting", false); + response.put("zabstate", ""); + } + return response ; + } + } + private Commands() {} } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index b79f5702c..80b41ef5c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -40,7 +40,7 @@ public class Follower extends Learner{ private long lastQueued; // This is the same object as this.zk, but we cache the downcast op final FollowerZooKeeperServer fzk; - + Follower(QuorumPeer self,FollowerZooKeeperServer zk) { this.self = self; this.zk=zk; @@ -72,7 +72,8 @@ public class Follower extends Learner{ self.end_fle = 0; fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); try { - QuorumServer leaderServer = findLeader(); + self.setZabState(QuorumPeer.ZabState.DISCOVERY); + QuorumServer leaderServer = findLeader(); try { connectToLeader(leaderServer.addr, leaderServer.hostname); long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); @@ -86,7 +87,9 @@ public class Follower extends Learner{ + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } - syncWithLeader(newEpochZxid); + self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); + syncWithLeader(newEpochZxid); + self.setZabState(QuorumPeer.ZabState.BROADCAST); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); @@ -114,7 +117,7 @@ public class Follower extends Learner{ case Leader.PING: ping(qp); break; - case Leader.PROPOSAL: + case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { @@ -146,9 +149,9 @@ public class Follower extends Learner{ // get new designated leader from (current) leader's message ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); - boolean majorChange = + boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); - // commit (writes the new config to ZK tree (/zookeeper/config) + // commit (writes the new config to ZK tree (/zookeeper/config) fzk.commit(qp.getZxid()); if (majorChange) { throw new Exception("changes proposed in reconfig"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index b3c4b6c5d..11039bfd9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -469,6 +469,7 @@ public class Leader { zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean); try { + self.setZabState(QuorumPeer.ZabState.DISCOVERY); self.tick.set(0); zk.loadData(); @@ -539,6 +540,7 @@ public class Leader { waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); + self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); try { waitForNewLeaderAck(self.getId(), zk.getZxid()); @@ -590,6 +592,7 @@ public class Leader { self.setZooKeeperServer(zk); } + self.setZabState(QuorumPeer.ZabState.BROADCAST); self.adminServer.setZooKeeperServer(zk); // Everything is a go, simply start counting the ticks diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index a8d89b28d..74f9f4ff1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -235,7 +235,7 @@ public class Learner { /** * Establish a connection with the Leader found by findLeader. Retries - * until either initLimit time has elapsed or 5 tries have happened. + * until either initLimit time has elapsed or 5 tries have happened. * @param addr - the address of the Leader to connect to. * @throws IOException - if the socket connection fails on the 5th attempt * <li>if there is an authentication failure while connecting to leader</li> @@ -309,7 +309,7 @@ public class Learner { /** * Once connected to the leader, perform the handshake protocol to - * establish a following / observing connection. + * establish a following / observing connection. * @param pktType * @return the zxid the Leader sends for synchronization purposes. * @throws IOException @@ -368,7 +368,7 @@ public class Learner { } /** - * Finally, synchronize our history with the Leader. + * Finally, synchronize our history with the Leader. * @param newLeaderZxid * @throws IOException * @throws InterruptedException @@ -390,6 +390,7 @@ public class Learner { synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + self.setSyncMode(QuorumPeer.SyncMode.DIFF); if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); snapshotNeeded = true; @@ -399,6 +400,7 @@ public class Learner { } } else if (qp.getType() == Leader.SNAP) { + self.setSyncMode(QuorumPeer.SyncMode.SNAP); LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid())); // The leader is going to dump the database // db is clear as part of deserializeSnapshot() @@ -421,6 +423,7 @@ public class Learner { syncSnapshot = true; } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader + self.setSyncMode(QuorumPeer.SyncMode.TRUNC); LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid())); boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid()); @@ -591,6 +594,7 @@ public class Learner { } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); + self.setSyncMode(QuorumPeer.SyncMode.NONE); zk.startServing(); /* * Update the election vote here to ensure that all members of the diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java index 050582d62..6e1d9c150 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java @@ -63,6 +63,7 @@ public class Observer extends Learner{ zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean); try { + self.setZabState(QuorumPeer.ZabState.DISCOVERY); QuorumServer leaderServer = findLeader(); LOG.info("Observing " + leaderServer.addr); try { @@ -70,8 +71,10 @@ public class Observer extends Learner{ long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO); if (self.isReconfigStateChange()) throw new Exception("learned about role change"); - + + self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); syncWithLeader(newLeaderZxid); + self.setZabState(QuorumPeer.ZabState.BROADCAST); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 474f70c44..daf605cab 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -400,6 +400,22 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider LOOKING, FOLLOWING, LEADING, OBSERVING; } + /** + * (Used for monitoring) shows the current phase of + * Zab protocol that peer is running. + */ + public enum ZabState { + ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST; + } + + /** + * (Used for monitoring) When peer is in synchronization phase, this shows + * which synchronization mechanism is being used + */ + public enum SyncMode { + NONE, DIFF, SNAP, TRUNC; + } + /* * A peer can either be participating, which implies that it is willing to * both vote in instances of consensus and to elect or become a Leader, or @@ -715,11 +731,45 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider private ServerState state = ServerState.LOOKING; + private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION); + private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE); + private boolean reconfigFlag = false; // indicates that a reconfig just committed public synchronized void setPeerState(ServerState newState){ state=newState; } + + public void setZabState(ZabState zabState) { + this.zabState.set(zabState); + LOG.info("Peer state changed: {}", getDetailedPeerState()); + } + + public void setSyncMode(SyncMode syncMode) { + this.syncMode.set(syncMode); + LOG.info("Peer state changed: {}", getDetailedPeerState()); + } + + public ZabState getZabState() { + return zabState.get(); + } + + public SyncMode getSyncMode() { + return syncMode.get(); + } + public String getDetailedPeerState() { + final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase()); + final ZabState zabState = getZabState(); + if (!ZabState.ELECTION.equals(zabState)) { + sb.append(" - ").append(zabState.toString().toLowerCase()); + } + final SyncMode syncMode = getSyncMode(); + if (!SyncMode.NONE.equals(syncMode)) { + sb.append(" - ").append(syncMode.toString().toLowerCase()); + } + return sb.toString(); + } + public synchronized void reconfigFlagSet(){ reconfigFlag = true; } |