diff options
author | rfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f> | 2010-09-13 09:40:11 +0000 |
---|---|---|
committer | rfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f> | 2010-09-13 09:40:11 +0000 |
commit | cde2b6de0d739ab7e1447c622eeb953d2197e060 (patch) | |
tree | d02d60476569fae21958f0640e581c24991ee2f5 /fs/osfs | |
parent | dbd5a0fc19c915cd76947b6bda94859dfb50fba8 (diff) | |
download | pyfilesystem-cde2b6de0d739ab7e1447c622eeb953d2197e060.tar.gz |
osfs.watch_inotify: fix handling of multiple watchers on a single path.
Turns out the pyinotify "add_watch" method actually updates an existing watch
if there is already one for the given path. Instead, we create a new manager
(and hence, a new inotify fil descriptor) for each watcher added.
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@424 67cdc799-7952-0410-af00-57a81ceafa0f
Diffstat (limited to 'fs/osfs')
-rw-r--r-- | fs/osfs/watch_inotify.py | 148 |
1 files changed, 113 insertions, 35 deletions
diff --git a/fs/osfs/watch_inotify.py b/fs/osfs/watch_inotify.py index 4be20e7..e8f20f2 100644 --- a/fs/osfs/watch_inotify.py +++ b/fs/osfs/watch_inotify.py @@ -9,6 +9,7 @@ Change watcher support for OSFS, backed by pyinotify. import os import sys import errno +import select import threading from fs.errors import * @@ -29,20 +30,38 @@ class OSFSWatchMixin(WatchableFSMixin): """Mixin providing change-watcher support via pyinotify.""" __watch_lock = threading.Lock() - __watch_manager = None - __watch_notifier = None + __watch_thread = None def close(self): super(OSFSWatchMixin,self).close() - self.__shutdown_watch_manager(force=True) self.notify_watchers(CLOSED) + for watcher_list in self._watchers.values(): + for watcher in watcher_list: + self.del_watcher(watcher) + self.__watch_lock.acquire() + try: + wt = self.__watch_thread + if wt is not None and not wt.watchers: + wt.stop() + OSFSWatchMixin.__watch_thread = None + finally: + self.__watch_lock.release() + def add_watcher(self,callback,path="/",events=None,recursive=True): - w = super(OSFSWatchMixin,self).add_watcher(callback,path,events,recursive) + super_add_watcher = super(OSFSWatchMixin,self).add_watcher + w = super_add_watcher(callback,path,events,recursive) + w._pyinotify_id = None syspath = self.getsyspath(path) if isinstance(syspath,unicode): syspath = syspath.encode(sys.getfilesystemencoding()) - wm = self.__get_watch_manager() + # Each watch gets its own WatchManager, since it's tricky to make + # a single WatchManager handle multiple callbacks with different + # events for a single path. This means we pay one file descriptor + # for each watcher added to the filesystem. That's not too bad. + w._pyinotify_WatchManager = wm = pyinotify.WatchManager() + # Each individual notifier gets multiplexed by a single shared thread. + w._pyinotify_Notifier = pyinotify.Notifier(wm) evtmask = self.__get_event_mask(events) def process_events(event): self.__route_event(w,event) @@ -52,19 +71,30 @@ class OSFSWatchMixin(WatchableFSMixin): except pyinotify.WatchManagerError, e: raise OperationFailedError("add_watcher",details=e) w._pyinotify_id = wids[syspath] + self.__watch_lock.acquire() + try: + wt = self.__get_watch_thread() + wt.add_watcher(w) + finally: + self.__watch_lock.release() return w def del_watcher(self,watcher_or_callback): - wm = self.__get_watch_manager() if isinstance(watcher_or_callback,Watcher): watchers = [watcher_or_callback] else: watchers = self._find_watchers(watcher_or_callback) for watcher in watchers: + wm = watcher._pyinotify_WatchManager wm.rm_watch(watcher._pyinotify_id,rec=watcher.recursive) super(OSFSWatchMixin,self).del_watcher(watcher) - if not wm._wmd: - self.__shutdown_watch_manager() + self.__watch_lock.acquire() + try: + wt = self.__get_watch_thread() + for watcher in watchers: + wt.del_watcher(watcher) + finally: + self.__watch_lock.release() def __get_event_mask(self,events): """Convert the given set of events into a pyinotify event mask.""" @@ -149,33 +179,81 @@ class OSFSWatchMixin(WatchableFSMixin): if inevt.mask & pyinotify.IN_UNMOUNT: watcher.handle_event(CLOSE(self)) - def __get_watch_manager(self): - """Get the shared watch manager, initializing if necessary.""" - if OSFSWatchMixin.__watch_notifier is None: - self.__watch_lock.acquire() - try: - if self.__watch_notifier is None: - wm = pyinotify.WatchManager() - n = pyinotify.ThreadedNotifier(wm) - n.start() - OSFSWatchMixin.__watch_manager = wm - OSFSWatchMixin.__watch_notifier = n - finally: - self.__watch_lock.release() - return OSFSWatchMixin.__watch_manager - - def __shutdown_watch_manager(self,force=False): - """Stop the shared watch manager, if there are no watches left.""" - self.__watch_lock.acquire() + def __get_watch_thread(self): + """Get the shared watch thread, initializing if necessary. + + This method must only be called while holding self.__watch_lock, or + multiple notifiers could be created. + """ + if OSFSWatchMixin.__watch_thread is None: + OSFSWatchMixin.__watch_thread = SharedThreadedNotifier() + OSFSWatchMixin.__watch_thread.start() + return OSFSWatchMixin.__watch_thread + + +class SharedThreadedNotifier(threading.Thread): + """pyinotifer Notifier that can manage multiple WatchManagers. + + Each watcher added to an OSFS corresponds to a new pyinotify.WatchManager + instance. Rather than run a notifier thread for each manager, we run a + single thread that multiplexes between them all. + """ + + def __init__(self): + super(SharedThreadedNotifier,self).__init__() + self.daemon = True + self.running = False + self._pipe_r, self._pipe_w = os.pipe() + self._poller = select.poll() + self._poller.register(self._pipe_r,select.POLLIN) + self.watchers = {} + + def add_watcher(self,watcher): + fd = watcher._pyinotify_WatchManager.get_fd() + self.watchers[fd] = watcher + self._poller.register(fd,select.POLLIN) + # Bump the poll object so it recognises the new fd. + os.write(self._pipe_w,"H") + + def del_watcher(self,watcher): + fd = watcher._pyinotify_WatchManager.get_fd() try: - if OSFSWatchMixin.__watch_manager is None: - return - if not force and OSFSWatchMixin.__watch_manager._wmd: - return - OSFSWatchMixin.__watch_notifier.stop() - OSFSWatchMixin.__watch_notifier = None - OSFSWatchMixin.__watch_manager = None - finally: - self.__watch_lock.release() + del self.watchers[fd] + except KeyError: + pass + else: + self._poller.unregister(fd) + + def run(self): + self.running = True + while self.running: + try: + ready_fds = self._poller.poll() + except select.error, e: + if e[0] != errno.EINTR: + raise + else: + for (fd,event) in ready_fds: + # Ignore all events other than "input ready". + if not event & select.POLLIN: + continue + # For signals on our internal pipe, just read and discard. + if fd == self._pipe_r: + os.read(self._pipe_r,1) + # For notifier fds, dispath to the notifier methods. + else: + try: + notifier = self.watchers[fd]._pyinotify_Notifier + except KeyError: + pass + else: + notifier.read_events() + notifier.process_events() + + def stop(self): + if self.running: + self.running = False + os.write(self._pipe_w,"S") + os.close(self._pipe_w) |