summaryrefslogtreecommitdiff
path: root/fs/osfs
diff options
context:
space:
mode:
authorrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2010-09-13 09:40:11 +0000
committerrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2010-09-13 09:40:11 +0000
commitcde2b6de0d739ab7e1447c622eeb953d2197e060 (patch)
treed02d60476569fae21958f0640e581c24991ee2f5 /fs/osfs
parentdbd5a0fc19c915cd76947b6bda94859dfb50fba8 (diff)
downloadpyfilesystem-cde2b6de0d739ab7e1447c622eeb953d2197e060.tar.gz
osfs.watch_inotify: fix handling of multiple watchers on a single path.
Turns out the pyinotify "add_watch" method actually updates an existing watch if there is already one for the given path. Instead, we create a new manager (and hence, a new inotify fil descriptor) for each watcher added. git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@424 67cdc799-7952-0410-af00-57a81ceafa0f
Diffstat (limited to 'fs/osfs')
-rw-r--r--fs/osfs/watch_inotify.py148
1 files changed, 113 insertions, 35 deletions
diff --git a/fs/osfs/watch_inotify.py b/fs/osfs/watch_inotify.py
index 4be20e7..e8f20f2 100644
--- a/fs/osfs/watch_inotify.py
+++ b/fs/osfs/watch_inotify.py
@@ -9,6 +9,7 @@ Change watcher support for OSFS, backed by pyinotify.
import os
import sys
import errno
+import select
import threading
from fs.errors import *
@@ -29,20 +30,38 @@ class OSFSWatchMixin(WatchableFSMixin):
"""Mixin providing change-watcher support via pyinotify."""
__watch_lock = threading.Lock()
- __watch_manager = None
- __watch_notifier = None
+ __watch_thread = None
def close(self):
super(OSFSWatchMixin,self).close()
- self.__shutdown_watch_manager(force=True)
self.notify_watchers(CLOSED)
+ for watcher_list in self._watchers.values():
+ for watcher in watcher_list:
+ self.del_watcher(watcher)
+ self.__watch_lock.acquire()
+ try:
+ wt = self.__watch_thread
+ if wt is not None and not wt.watchers:
+ wt.stop()
+ OSFSWatchMixin.__watch_thread = None
+ finally:
+ self.__watch_lock.release()
+
def add_watcher(self,callback,path="/",events=None,recursive=True):
- w = super(OSFSWatchMixin,self).add_watcher(callback,path,events,recursive)
+ super_add_watcher = super(OSFSWatchMixin,self).add_watcher
+ w = super_add_watcher(callback,path,events,recursive)
+ w._pyinotify_id = None
syspath = self.getsyspath(path)
if isinstance(syspath,unicode):
syspath = syspath.encode(sys.getfilesystemencoding())
- wm = self.__get_watch_manager()
+ # Each watch gets its own WatchManager, since it's tricky to make
+ # a single WatchManager handle multiple callbacks with different
+ # events for a single path. This means we pay one file descriptor
+ # for each watcher added to the filesystem. That's not too bad.
+ w._pyinotify_WatchManager = wm = pyinotify.WatchManager()
+ # Each individual notifier gets multiplexed by a single shared thread.
+ w._pyinotify_Notifier = pyinotify.Notifier(wm)
evtmask = self.__get_event_mask(events)
def process_events(event):
self.__route_event(w,event)
@@ -52,19 +71,30 @@ class OSFSWatchMixin(WatchableFSMixin):
except pyinotify.WatchManagerError, e:
raise OperationFailedError("add_watcher",details=e)
w._pyinotify_id = wids[syspath]
+ self.__watch_lock.acquire()
+ try:
+ wt = self.__get_watch_thread()
+ wt.add_watcher(w)
+ finally:
+ self.__watch_lock.release()
return w
def del_watcher(self,watcher_or_callback):
- wm = self.__get_watch_manager()
if isinstance(watcher_or_callback,Watcher):
watchers = [watcher_or_callback]
else:
watchers = self._find_watchers(watcher_or_callback)
for watcher in watchers:
+ wm = watcher._pyinotify_WatchManager
wm.rm_watch(watcher._pyinotify_id,rec=watcher.recursive)
super(OSFSWatchMixin,self).del_watcher(watcher)
- if not wm._wmd:
- self.__shutdown_watch_manager()
+ self.__watch_lock.acquire()
+ try:
+ wt = self.__get_watch_thread()
+ for watcher in watchers:
+ wt.del_watcher(watcher)
+ finally:
+ self.__watch_lock.release()
def __get_event_mask(self,events):
"""Convert the given set of events into a pyinotify event mask."""
@@ -149,33 +179,81 @@ class OSFSWatchMixin(WatchableFSMixin):
if inevt.mask & pyinotify.IN_UNMOUNT:
watcher.handle_event(CLOSE(self))
- def __get_watch_manager(self):
- """Get the shared watch manager, initializing if necessary."""
- if OSFSWatchMixin.__watch_notifier is None:
- self.__watch_lock.acquire()
- try:
- if self.__watch_notifier is None:
- wm = pyinotify.WatchManager()
- n = pyinotify.ThreadedNotifier(wm)
- n.start()
- OSFSWatchMixin.__watch_manager = wm
- OSFSWatchMixin.__watch_notifier = n
- finally:
- self.__watch_lock.release()
- return OSFSWatchMixin.__watch_manager
-
- def __shutdown_watch_manager(self,force=False):
- """Stop the shared watch manager, if there are no watches left."""
- self.__watch_lock.acquire()
+ def __get_watch_thread(self):
+ """Get the shared watch thread, initializing if necessary.
+
+ This method must only be called while holding self.__watch_lock, or
+ multiple notifiers could be created.
+ """
+ if OSFSWatchMixin.__watch_thread is None:
+ OSFSWatchMixin.__watch_thread = SharedThreadedNotifier()
+ OSFSWatchMixin.__watch_thread.start()
+ return OSFSWatchMixin.__watch_thread
+
+
+class SharedThreadedNotifier(threading.Thread):
+ """pyinotifer Notifier that can manage multiple WatchManagers.
+
+ Each watcher added to an OSFS corresponds to a new pyinotify.WatchManager
+ instance. Rather than run a notifier thread for each manager, we run a
+ single thread that multiplexes between them all.
+ """
+
+ def __init__(self):
+ super(SharedThreadedNotifier,self).__init__()
+ self.daemon = True
+ self.running = False
+ self._pipe_r, self._pipe_w = os.pipe()
+ self._poller = select.poll()
+ self._poller.register(self._pipe_r,select.POLLIN)
+ self.watchers = {}
+
+ def add_watcher(self,watcher):
+ fd = watcher._pyinotify_WatchManager.get_fd()
+ self.watchers[fd] = watcher
+ self._poller.register(fd,select.POLLIN)
+ # Bump the poll object so it recognises the new fd.
+ os.write(self._pipe_w,"H")
+
+ def del_watcher(self,watcher):
+ fd = watcher._pyinotify_WatchManager.get_fd()
try:
- if OSFSWatchMixin.__watch_manager is None:
- return
- if not force and OSFSWatchMixin.__watch_manager._wmd:
- return
- OSFSWatchMixin.__watch_notifier.stop()
- OSFSWatchMixin.__watch_notifier = None
- OSFSWatchMixin.__watch_manager = None
- finally:
- self.__watch_lock.release()
+ del self.watchers[fd]
+ except KeyError:
+ pass
+ else:
+ self._poller.unregister(fd)
+
+ def run(self):
+ self.running = True
+ while self.running:
+ try:
+ ready_fds = self._poller.poll()
+ except select.error, e:
+ if e[0] != errno.EINTR:
+ raise
+ else:
+ for (fd,event) in ready_fds:
+ # Ignore all events other than "input ready".
+ if not event & select.POLLIN:
+ continue
+ # For signals on our internal pipe, just read and discard.
+ if fd == self._pipe_r:
+ os.read(self._pipe_r,1)
+ # For notifier fds, dispath to the notifier methods.
+ else:
+ try:
+ notifier = self.watchers[fd]._pyinotify_Notifier
+ except KeyError:
+ pass
+ else:
+ notifier.read_events()
+ notifier.process_events()
+
+ def stop(self):
+ if self.running:
+ self.running = False
+ os.write(self._pipe_w,"S")
+ os.close(self._pipe_w)