diff options
Diffstat (limited to 'zookeeper-server/src/main/java/org/apache')
6 files changed, 159 insertions, 157 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index a6f605390..603cb0b38 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -675,7 +675,9 @@ public class DataTree { public void addWatch(String basePath, Watcher watcher, int mode) { WatcherMode watcherMode = WatcherMode.fromZooDef(mode); dataWatches.addWatch(basePath, watcher, watcherMode); - childWatches.addWatch(basePath, watcher, watcherMode); + if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) { + childWatches.addWatch(basePath, watcher, watcherMode); + } } public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException { @@ -1511,7 +1513,6 @@ public class DataTree { this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT); } for (String path : persistentRecursiveWatches) { - this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE); this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java index 1bc44c805..4eea5eca0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java @@ -144,13 +144,4 @@ public interface IWatchManager { * */ void dumpWatches(PrintWriter pwriter, boolean byPath); - - /** - * Return the current number of recursive watchers - * - * @return qty - */ - default int getRecursiveWatchQty() { - return 0; - } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java index c5b133059..c85c3d846 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.watch; import java.io.PrintWriter; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -45,9 +46,9 @@ public class WatchManager implements IWatchManager { private final Map<String, Set<Watcher>> watchTable = new HashMap<>(); - private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>(); + private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>(); - private final WatcherModeManager watcherModeManager = new WatcherModeManager(); + private int recursiveWatchQty = 0; @Override public synchronized int size() { @@ -84,25 +85,34 @@ public class WatchManager implements IWatchManager { } list.add(watcher); - Set<String> paths = watch2Paths.get(watcher); + Map<String, WatchStats> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here - paths = new HashSet<>(); + paths = new HashMap<>(); watch2Paths.put(watcher, paths); } - watcherModeManager.setWatcherMode(watcher, path, watcherMode); + WatchStats stats = paths.getOrDefault(path, WatchStats.NONE); + WatchStats newStats = stats.addMode(watcherMode); - return paths.add(path); + if (newStats != stats) { + paths.put(path, newStats); + if (watcherMode.isRecursive()) { + ++recursiveWatchQty; + } + return true; + } + + return false; } @Override public synchronized void removeWatcher(Watcher watcher) { - Set<String> paths = watch2Paths.remove(watcher); + Map<String, WatchStats> paths = watch2Paths.remove(watcher); if (paths == null) { return; } - for (String p : paths) { + for (String p : paths.keySet()) { Set<Watcher> list = watchTable.get(p); if (list != null) { list.remove(watcher); @@ -110,7 +120,11 @@ public class WatchManager implements IWatchManager { watchTable.remove(p); } } - watcherModeManager.removeWatcher(watcher, p); + } + for (WatchStats stats : paths.values()) { + if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { + --recursiveWatchQty; + } } } @@ -123,8 +137,8 @@ public class WatchManager implements IWatchManager { public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); Set<Watcher> watchers = new HashSet<>(); - PathParentIterator pathParentIterator = getPathParentIterator(path); synchronized (this) { + PathParentIterator pathParentIterator = getPathParentIterator(path); for (String localPath : pathParentIterator.asIterable()) { Set<Watcher> thisWatchers = watchTable.get(localPath); if (thisWatchers == null || thisWatchers.isEmpty()) { @@ -133,20 +147,23 @@ public class WatchManager implements IWatchManager { Iterator<Watcher> iterator = thisWatchers.iterator(); while (iterator.hasNext()) { Watcher watcher = iterator.next(); - WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath); - if (watcherMode.isRecursive()) { - if (type != EventType.NodeChildrenChanged) { - watchers.add(watcher); - } - } else if (!pathParentIterator.atParentPath()) { + Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap()); + WatchStats stats = paths.get(localPath); + if (stats == null) { + LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath); + continue; + } + if (!pathParentIterator.atParentPath()) { watchers.add(watcher); - if (!watcherMode.isPersistent()) { + WatchStats newStats = stats.removeMode(WatcherMode.STANDARD); + if (newStats == WatchStats.NONE) { iterator.remove(); - Set<String> paths = watch2Paths.get(watcher); - if (paths != null) { - paths.remove(localPath); - } + paths.remove(localPath); + } else if (newStats != stats) { + paths.put(localPath, newStats); } + } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { + watchers.add(watcher); } } if (thisWatchers.isEmpty()) { @@ -199,7 +216,7 @@ public class WatchManager implements IWatchManager { sb.append(watch2Paths.size()).append(" connections watching ").append(watchTable.size()).append(" paths\n"); int total = 0; - for (Set<String> paths : watch2Paths.values()) { + for (Map<String, WatchStats> paths : watch2Paths.values()) { total += paths.size(); } sb.append("Total watches:").append(total); @@ -219,10 +236,10 @@ public class WatchManager implements IWatchManager { } } } else { - for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) { + for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) { pwriter.print("0x"); pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId())); - for (String path : e.getValue()) { + for (String path : e.getValue().keySet()) { pwriter.print("\t"); pwriter.println(path); } @@ -232,31 +249,28 @@ public class WatchManager implements IWatchManager { @Override public synchronized boolean containsWatcher(String path, Watcher watcher) { - WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, path); - PathParentIterator pathParentIterator = getPathParentIterator(path); - for (String localPath : pathParentIterator.asIterable()) { - Set<Watcher> watchers = watchTable.get(localPath); - if (!pathParentIterator.atParentPath()) { - if (watchers != null) { - return true; // at the leaf node, all watcher types match - } - } - if (watcherMode.isRecursive()) { - return true; - } - } - return false; + Set<Watcher> list = watchTable.get(path); + return list != null && list.contains(watcher); } @Override public synchronized boolean removeWatcher(String path, Watcher watcher) { - Set<String> paths = watch2Paths.get(watcher); - if (paths == null || !paths.remove(path)) { + Map<String, WatchStats> paths = watch2Paths.get(watcher); + if (paths == null) { return false; } + WatchStats stats = paths.remove(path); + if (stats == null) { + return false; + } + if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { + --recursiveWatchQty; + } + Set<Watcher> list = watchTable.get(path); if (list == null || !list.remove(watcher)) { + LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher); return false; } @@ -264,17 +278,20 @@ public class WatchManager implements IWatchManager { watchTable.remove(path); } - watcherModeManager.removeWatcher(watcher, path); - return true; } + // VisibleForTesting + Map<Watcher, Map<String, WatchStats>> getWatch2Paths() { + return watch2Paths; + } + @Override public synchronized WatchesReport getWatches() { Map<Long, Set<String>> id2paths = new HashMap<>(); - for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) { + for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) { Long id = ((ServerCnxn) e.getKey()).getSessionId(); - Set<String> paths = new HashSet<>(e.getValue()); + Set<String> paths = new HashSet<>(e.getValue().keySet()); id2paths.put(id, paths); } return new WatchesReport(id2paths); @@ -296,7 +313,7 @@ public class WatchManager implements IWatchManager { @Override public synchronized WatchesSummary getWatchesSummary() { int totalWatches = 0; - for (Set<String> paths : watch2Paths.values()) { + for (Map<String, WatchStats> paths : watch2Paths.values()) { totalWatches += paths.size(); } return new WatchesSummary(watch2Paths.size(), watchTable.size(), totalWatches); @@ -305,13 +322,13 @@ public class WatchManager implements IWatchManager { @Override public void shutdown() { /* do nothing */ } - @Override - public int getRecursiveWatchQty() { - return watcherModeManager.getRecursiveQty(); + // VisibleForTesting + synchronized int getRecursiveWatchQty() { + return recursiveWatchQty; } private PathParentIterator getPathParentIterator(String path) { - if (watcherModeManager.getRecursiveQty() == 0) { + if (getRecursiveWatchQty() == 0) { return PathParentIterator.forPathOnly(path); } return PathParentIterator.forAll(path); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java new file mode 100644 index 000000000..fd0c0259e --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +/** + * Statistics for multiple different watches on one node. + */ +public final class WatchStats { + private static final WatchStats[] WATCH_STATS = new WatchStats[] { + new WatchStats(0), // NONE + new WatchStats(1), // STANDARD + new WatchStats(2), // PERSISTENT + new WatchStats(3), // STANDARD + PERSISTENT + new WatchStats(4), // PERSISTENT_RECURSIVE + new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE + new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE + new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE + }; + + /** + * Stats that have no watchers attached. + * + * <p>This could be used as start point to compute new stats using {@link #addMode(WatcherMode)}. + */ + public static final WatchStats NONE = WATCH_STATS[0]; + + private final int flags; + + private WatchStats(int flags) { + this.flags = flags; + } + + private static int modeToFlag(WatcherMode mode) { + return 1 << mode.ordinal(); + } + + /** + * Compute stats after given mode attached to node. + * + * @param mode watcher mode + * @return a new stats if given mode is not attached to this node before, otherwise old stats + */ + public WatchStats addMode(WatcherMode mode) { + int flags = this.flags | modeToFlag(mode); + return WATCH_STATS[flags]; + } + + /** + * Compute stats after given mode removed from node. + * + * @param mode watcher mode + * @return null if given mode is the last attached mode, otherwise a new stats + */ + public WatchStats removeMode(WatcherMode mode) { + int mask = ~modeToFlag(mode); + int flags = this.flags & mask; + if (flags == 0) { + return NONE; + } + return WATCH_STATS[flags]; + } + + /** + * Check whether given mode is attached to this node. + * + * @param mode watcher mode + * @return true if given mode is attached to this node. + */ + public boolean hasMode(WatcherMode mode) { + int flags = modeToFlag(mode); + return (this.flags & flags) != 0; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java index b8a1dda74..e05ba900e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java @@ -23,7 +23,7 @@ import org.apache.zookeeper.ZooDefs; public enum WatcherMode { STANDARD(false, false), PERSISTENT(true, false), - PERSISTENT_RECURSIVE(true, true) + PERSISTENT_RECURSIVE(true, true), ; public static final WatcherMode DEFAULT_WATCHER_MODE = WatcherMode.STANDARD; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java deleted file mode 100644 index c1a8225f8..000000000 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.watch; - -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.zookeeper.Watcher; - -class WatcherModeManager { - private final Map<Key, WatcherMode> watcherModes = new ConcurrentHashMap<>(); - private final AtomicInteger recursiveQty = new AtomicInteger(0); - - private static class Key { - private final Watcher watcher; - private final String path; - - Key(Watcher watcher, String path) { - this.watcher = watcher; - this.path = path; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Key key = (Key) o; - return watcher.equals(key.watcher) && path.equals(key.path); - } - - @Override - public int hashCode() { - return Objects.hash(watcher, path); - } - } - - // VisibleForTesting - Map<Key, WatcherMode> getWatcherModes() { - return watcherModes; - } - - void setWatcherMode(Watcher watcher, String path, WatcherMode mode) { - if (mode == WatcherMode.DEFAULT_WATCHER_MODE) { - removeWatcher(watcher, path); - } else { - adjustRecursiveQty(watcherModes.put(new Key(watcher, path), mode), mode); - } - } - - WatcherMode getWatcherMode(Watcher watcher, String path) { - return watcherModes.getOrDefault(new Key(watcher, path), WatcherMode.DEFAULT_WATCHER_MODE); - } - - void removeWatcher(Watcher watcher, String path) { - adjustRecursiveQty(watcherModes.remove(new Key(watcher, path)), WatcherMode.DEFAULT_WATCHER_MODE); - } - - int getRecursiveQty() { - return recursiveQty.get(); - } - - // recursiveQty is an optimization to avoid having to walk the map every time this value is needed - private void adjustRecursiveQty(WatcherMode oldMode, WatcherMode newMode) { - if (oldMode == null) { - oldMode = WatcherMode.DEFAULT_WATCHER_MODE; - } - if (oldMode.isRecursive() != newMode.isRecursive()) { - if (newMode.isRecursive()) { - recursiveQty.incrementAndGet(); - } else { - recursiveQty.decrementAndGet(); - } - } - } -} |