summaryrefslogtreecommitdiff
path: root/fs/wrapfs/__init__.py
blob: d50cee8f57bd512169535077d2e46d39e494ab30 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
"""
fs.wrapfs
=========

A class for wrapping an existing FS object with additional functionality.

This module provides the class WrapFS, a base class for objects that wrap
another FS object and provide some transformation of its contents.  It could
be very useful for implementing e.g. transparent encryption or compression
services.

For a simple example of how this class could be used, see the 'HideDotFilesFS'
class in the module fs.wrapfs.hidedotfilesfs.  This wrapper implements the
standard unix shell functionality of hiding dot-files in directory listings.

"""

import re
import sys
import fnmatch
import threading

from fs.base import FS, threading, synchronize, NoDefaultMeta
from fs.errors import *
from fs.path import *
from fs.local_functools import wraps


def rewrite_errors(func):
    """Re-write paths in errors raised by wrapped FS objects."""
    @wraps(func)
    def wrapper(self,*args,**kwds):
        try:
            return func(self,*args,**kwds)
        except ResourceError, e:
            (exc_type,exc_inst,tb) = sys.exc_info()
            try:
                e.path = self._decode(e.path)
            except (AttributeError, ValueError, TypeError):
                raise e, None, tb
            raise
    return wrapper


class WrapFS(FS):
    """FS that wraps another FS, providing translation etc.

    This class allows simple transforms to be applied to the names
    and/or contents of files in an FS.  It could be used to implement
    e.g. compression or encryption in a relatively painless manner.

    The following methods can be overridden to control how files are
    accessed in the underlying FS object:

     * _file_wrap(file, mode):  called for each file that is opened from
                                the underlying FS; may return a modified
                                file-like object.

     *  _encode(path):  encode a path for access in the underlying FS

     *  _decode(path):  decode a path from the underlying FS

    If the required path translation proceeds one component at a time,
    it may be simpler to override the _encode_name() and _decode_name()
    methods.
    """

    def __init__(self, fs):
        super(WrapFS, self).__init__()
        try:
            self._lock = fs._lock
        except (AttributeError,FSError):
            self._lock = self._lock = threading.RLock()
        self.wrapped_fs = fs

    def _file_wrap(self, f, mode):
        """Apply wrapping to an opened file."""
        return f

    def _encode_name(self, name):
        """Encode path component for the underlying FS."""
        return name

    def _decode_name(self, name):
        """Decode path component from the underlying FS."""
        return name

    def _encode(self, path):
        """Encode path for the underlying FS."""
        e_names = []
        for name in iteratepath(path):
            if name == "":
                e_names.append("")
            else:
                e_names.append(self._encode_name(name))
        return "/".join(e_names)

    def _decode(self, path):
        """Decode path from the underlying FS."""
        d_names = []
        for name in iteratepath(path):
            if name == "":
                d_names.append("")
            else:
                d_names.append(self._decode_name(name))
        return "/".join(d_names)

    def _adjust_mode(self, mode):
        """Adjust the mode used to open a file in the underlying FS.

        This method takes the mode given when opening a file, and should
        return a two-tuple giving the mode to be used in this FS as first
        item, and the mode to be used in the underlying FS as the second.

        An example of why this is needed is a WrapFS subclass that does
        transparent file compression - in this case files from the wrapped
        FS cannot be opened in append mode.
        """
        return (mode, mode)

    def __unicode__(self):
        return u"<%s: %s>" % (self.__class__.__name__,self.wrapped_fs,)

    #def __str__(self):
    #    return unicode(self).encode(sys.getdefaultencoding(),"replace")


    @rewrite_errors
    def getmeta(self, meta_name, default=NoDefaultMeta):
        return self.wrapped_fs.getmeta(meta_name, default)

    @rewrite_errors
    def hasmeta(self, meta_name):
        return self.wrapped_fs.hasmeta(meta_name)

    @rewrite_errors
    def validatepath(self, path):
        return self.wrapped_fs.validatepath(self._encode(path))

    @rewrite_errors
    def getsyspath(self, path, allow_none=False):
        return self.wrapped_fs.getsyspath(self._encode(path), allow_none)

    @rewrite_errors
    def getpathurl(self, path, allow_none=False):
        return self.wrapped_fs.getpathurl(self._encode(path), allow_none)

    @rewrite_errors
    def hassyspath(self, path):
        return self.wrapped_fs.hassyspath(self._encode(path))

    @rewrite_errors
    def open(self, path, mode='r', **kwargs):
        (mode, wmode) = self._adjust_mode(mode)
        f = self.wrapped_fs.open(self._encode(path), wmode, **kwargs)
        return self._file_wrap(f, mode)

    @rewrite_errors
    def setcontents(self, path, data, encoding=None, errors=None, chunk_size=64*1024):
        #  We can't pass setcontents() through to the wrapped FS if the
        #  wrapper has defined a _file_wrap method, as it would bypass
        #  the file contents wrapping.
        #if self._file_wrap.im_func is WrapFS._file_wrap.im_func:
        if getattr(self.__class__, '_file_wrap', None) is getattr(WrapFS, '_file_wrap', None):
            return self.wrapped_fs.setcontents(self._encode(path), data, encoding=encoding, errors=errors, chunk_size=chunk_size)
        else:
            return super(WrapFS, self).setcontents(path, data, encoding=encoding, errors=errors, chunk_size=chunk_size)

    @rewrite_errors
    def createfile(self, path, wipe=False):
        return self.wrapped_fs.createfile(self._encode(path), wipe=wipe)

    @rewrite_errors
    def exists(self, path):
        return self.wrapped_fs.exists(self._encode(path))

    @rewrite_errors
    def isdir(self, path):
        return self.wrapped_fs.isdir(self._encode(path))

    @rewrite_errors
    def isfile(self, path):
        return self.wrapped_fs.isfile(self._encode(path))

    @rewrite_errors
    def listdir(self, path="", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
        kwds = dict(wildcard=wildcard,
                    full=full,
                    absolute=absolute,
                    dirs_only=dirs_only,
                    files_only=files_only)
        full = kwds.pop("full",False)
        absolute = kwds.pop("absolute",False)
        wildcard = kwds.pop("wildcard",None)
        if wildcard is None:
            wildcard = lambda fn:True
        elif not callable(wildcard):
            wildcard_re = re.compile(fnmatch.translate(wildcard))
            wildcard = lambda fn:bool (wildcard_re.match(fn))
        entries = []
        enc_path = self._encode(path)
        for e in self.wrapped_fs.listdir(enc_path,**kwds):
            e = basename(self._decode(pathcombine(enc_path,e)))
            if not wildcard(e):
                continue
            if full:
                e = pathcombine(path,e)
            elif absolute:
                e = abspath(pathcombine(path,e))
            entries.append(e)
        return entries

    @rewrite_errors
    def ilistdir(self, path="", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
        kwds = dict(wildcard=wildcard,
                    full=full,
                    absolute=absolute,
                    dirs_only=dirs_only,
                    files_only=files_only)
        full = kwds.pop("full",False)
        absolute = kwds.pop("absolute",False)
        wildcard = kwds.pop("wildcard",None)
        if wildcard is None:
            wildcard = lambda fn:True
        elif not callable(wildcard):
            wildcard_re = re.compile(fnmatch.translate(wildcard))
            wildcard = lambda fn:bool (wildcard_re.match(fn))
        enc_path = self._encode(path)
        for e in self.wrapped_fs.ilistdir(enc_path,**kwds):
            e = basename(self._decode(pathcombine(enc_path,e)))
            if not wildcard(e):
                continue
            if full:
                e = pathcombine(path,e)
            elif absolute:
                e = abspath(pathcombine(path,e))
            yield e

    @rewrite_errors
    def listdirinfo(self, path="", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
        kwds = dict(wildcard=wildcard,
                    full=full,
                    absolute=absolute,
                    dirs_only=dirs_only,
                    files_only=files_only)
        full = kwds.pop("full",False)
        absolute = kwds.pop("absolute",False)
        wildcard = kwds.pop("wildcard",None)
        if wildcard is None:
            wildcard = lambda fn:True
        elif not callable(wildcard):
            wildcard_re = re.compile(fnmatch.translate(wildcard))
            wildcard = lambda fn:bool (wildcard_re.match(fn))
        entries = []
        enc_path = self._encode(path)
        for (nm,info) in self.wrapped_fs.listdirinfo(enc_path,**kwds):
            nm = basename(self._decode(pathcombine(enc_path,nm)))
            if not wildcard(nm):
                continue
            if full:
                nm = pathcombine(path,nm)
            elif absolute:
                nm = abspath(pathcombine(path,nm))
            entries.append((nm,info))
        return entries

    @rewrite_errors
    def ilistdirinfo(self, path="", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
        kwds = dict(wildcard=wildcard,
                    full=full,
                    absolute=absolute,
                    dirs_only=dirs_only,
                    files_only=files_only)
        full = kwds.pop("full",False)
        absolute = kwds.pop("absolute",False)
        wildcard = kwds.pop("wildcard",None)
        if wildcard is None:
            wildcard = lambda fn:True
        elif not callable(wildcard):
            wildcard_re = re.compile(fnmatch.translate(wildcard))
            wildcard = lambda fn:bool (wildcard_re.match(fn))
        enc_path = self._encode(path)
        for (nm,info) in self.wrapped_fs.ilistdirinfo(enc_path,**kwds):
            nm = basename(self._decode(pathcombine(enc_path,nm)))
            if not wildcard(nm):
                continue
            if full:
                nm = pathcombine(path,nm)
            elif absolute:
                nm = abspath(pathcombine(path,nm))
            yield (nm,info)

    @rewrite_errors
    def walk(self,path="/",wildcard=None,dir_wildcard=None,search="breadth",ignore_errors=False):
        if dir_wildcard is not None:
            #  If there is a dir_wildcard, fall back to the default impl
            #  that uses listdir().  Otherwise we run the risk of enumerating
            #  lots of directories that will just be thrown away.
            for item in super(WrapFS,self).walk(path,wildcard,dir_wildcard,search,ignore_errors):
                yield item
        #  Otherwise, the wrapped FS may provide a more efficient impl
        #  which we can use directly.
        else:
            if wildcard is not None and not callable(wildcard):
                wildcard_re = re.compile(fnmatch.translate(wildcard))
                wildcard = lambda fn:bool (wildcard_re.match(fn))
            for (dirpath,filepaths) in self.wrapped_fs.walk(self._encode(path),search=search,ignore_errors=ignore_errors):
                filepaths = [basename(self._decode(pathcombine(dirpath,p)))
                                 for p in filepaths]
                dirpath = abspath(self._decode(dirpath))
                if wildcard is not None:
                    filepaths = [p for p in filepaths if wildcard(p)]
                yield (dirpath,filepaths)

    @rewrite_errors
    def walkfiles(self,path="/",wildcard=None,dir_wildcard=None,search="breadth",ignore_errors=False):
        if dir_wildcard is not None:
            #  If there is a dir_wildcard, fall back to the default impl
            #  that uses listdir().  Otherwise we run the risk of enumerating
            #  lots of directories that will just be thrown away.
            for item in super(WrapFS,self).walkfiles(path,wildcard,dir_wildcard,search,ignore_errors):
                yield item
        #  Otherwise, the wrapped FS may provide a more efficient impl
        #  which we can use directly.
        else:
            if wildcard is not None and not callable(wildcard):
                wildcard_re = re.compile(fnmatch.translate(wildcard))
                wildcard = lambda fn:bool (wildcard_re.match(fn))
            for filepath in self.wrapped_fs.walkfiles(self._encode(path),search=search,ignore_errors=ignore_errors):
                filepath = abspath(self._decode(filepath))
                if wildcard is not None:
                    if not wildcard(basename(filepath)):
                        continue
                yield filepath

    @rewrite_errors
    def walkdirs(self,path="/",wildcard=None,search="breadth",ignore_errors=False):
        if wildcard is not None:
            #  If there is a wildcard, fall back to the default impl
            #  that uses listdir().  Otherwise we run the risk of enumerating
            #  lots of directories that will just be thrown away.
            for item in super(WrapFS,self).walkdirs(path,wildcard,search,ignore_errors):
                yield item
        #  Otherwise, the wrapped FS may provide a more efficient impl
        #  which we can use directly.
        else:
            for dirpath in self.wrapped_fs.walkdirs(self._encode(path),search=search,ignore_errors=ignore_errors):
                yield abspath(self._decode(dirpath))


    @rewrite_errors
    def makedir(self, path, *args, **kwds):
        return self.wrapped_fs.makedir(self._encode(path),*args,**kwds)

    @rewrite_errors
    def remove(self, path):
        return self.wrapped_fs.remove(self._encode(path))

    @rewrite_errors
    def removedir(self, path, *args, **kwds):
        return self.wrapped_fs.removedir(self._encode(path),*args,**kwds)

    @rewrite_errors
    def rename(self, src, dst):
        return self.wrapped_fs.rename(self._encode(src),self._encode(dst))

    @rewrite_errors
    def getinfo(self, path):
        return self.wrapped_fs.getinfo(self._encode(path))

    @rewrite_errors
    def settimes(self, path, *args, **kwds):
        return self.wrapped_fs.settimes(self._encode(path), *args,**kwds)

    @rewrite_errors
    def desc(self, path):
        return self.wrapped_fs.desc(self._encode(path))

    @rewrite_errors
    def copy(self, src, dst, **kwds):
        return self.wrapped_fs.copy(self._encode(src),self._encode(dst),**kwds)

    @rewrite_errors
    def move(self, src, dst, **kwds):
        return self.wrapped_fs.move(self._encode(src),self._encode(dst),**kwds)

    @rewrite_errors
    def movedir(self, src, dst, **kwds):
        return self.wrapped_fs.movedir(self._encode(src),self._encode(dst),**kwds)

    @rewrite_errors
    def copydir(self, src, dst, **kwds):
        return self.wrapped_fs.copydir(self._encode(src),self._encode(dst),**kwds)

    @rewrite_errors
    def getxattr(self, path, name, default=None):
        try:
            return self.wrapped_fs.getxattr(self._encode(path),name,default)
        except AttributeError:
            raise UnsupportedError("getxattr")

    @rewrite_errors
    def setxattr(self, path, name, value):
        try:
            return self.wrapped_fs.setxattr(self._encode(path),name,value)
        except AttributeError:
            raise UnsupportedError("setxattr")

    @rewrite_errors
    def delxattr(self, path, name):
        try:
            return self.wrapped_fs.delxattr(self._encode(path),name)
        except AttributeError:
            raise UnsupportedError("delxattr")

    @rewrite_errors
    def listxattrs(self, path):
        try:
            return self.wrapped_fs.listxattrs(self._encode(path))
        except AttributeError:
            raise UnsupportedError("listxattrs")

    def __getattr__(self, attr):
        #  These attributes can be used by the destructor, but may not be
        #  defined if there are errors in the constructor.
        if attr == "closed":
            return False
        if attr == "wrapped_fs":
            return None
        if attr.startswith("_"):
            raise AttributeError(attr)
        return getattr(self.wrapped_fs,attr)

    @rewrite_errors
    def close(self):
        if not self.closed:
            self.wrapped_fs.close()
            super(WrapFS,self).close()
            self.wrapped_fs = None


def wrap_fs_methods(decorator, cls=None, exclude=[]):
    """Apply the given decorator to all FS methods on the given class.

    This function can be used in two ways.  When called with two arguments it
    applies the given function 'decorator' to each FS method of the given
    class.  When called with just a single argument, it creates and returns
    a class decorator which will do the same thing when applied.  So you can
    use it like this::

        wrap_fs_methods(mydecorator,MyFSClass)

    Or on more recent Python versions, like this::

        @wrap_fs_methods(mydecorator)
        class MyFSClass(FS):
            ...

    """
    def apply_decorator(cls):
        for method_name in wrap_fs_methods.method_names:
            if method_name in exclude:
                continue
            method = getattr(cls,method_name,None)
            if method is not None:
                setattr(cls,method_name,decorator(method))
        return cls
    if cls is not None:
        return apply_decorator(cls)
    else:
        return apply_decorator

wrap_fs_methods.method_names = ["open","exists","isdir","isfile","listdir",
    "makedir","remove","setcontents","removedir","rename","getinfo","copy",
    "move","copydir","movedir","close","getxattr","setxattr","delxattr",
    "listxattrs","validatepath","getsyspath","createfile", "hasmeta", "getmeta","listdirinfo",
    "ilistdir","ilistdirinfo"]