1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
"""
fs.xattrs
=========
Extended attribute support for FS
This module defines a standard interface for FS subclasses that want to
support extended file attributes, and a WrapFS subclass that can simulate
extended attributes on top of an ordinary FS.
FS instances offering extended attribute support must provide the following
methods:
* ``getxattr(path,name)`` Get the named attribute for the given path, or None if it does not exist
* ``setxattr(path,name,value)`` Set the named attribute for the given path to the given value
* ``delxattr(path,name)`` Delete the named attribute for the given path, raising KeyError if it does not exist
* ``listxattrs(path)`` Iterate over all stored attribute names for the given path
If extended attributes are required by FS-consuming code, it should use the
function 'ensure_xattrs'. This will interrogate an FS object to determine
if it has native xattr support, and return a wrapped version if it does not.
"""
import sys
try:
import cPickle as pickle
except ImportError:
import pickle
from fs.path import *
from fs.errors import *
from fs.wrapfs import WrapFS
from fs.base import synchronize
def ensure_xattrs(fs):
"""Ensure that the given FS supports xattrs, simulating them if required.
Given an FS object, this function returns an equivalent FS that has support
for extended attributes. This may be the original object if they are
supported natively, or a wrapper class is they must be simulated.
:param fs: An FS object that must have xattrs
"""
try:
# This attr doesn't have to exist, None should be returned by default
fs.getxattr("/","testing-xattr")
return fs
except (AttributeError,UnsupportedError):
return SimulateXAttr(fs)
class SimulateXAttr(WrapFS):
"""FS wrapper class that simulates xattr support.
The following methods are supplied for manipulating extended attributes:
* listxattrs: list all extended attribute names for a path
* getxattr: get an xattr of a path by name
* setxattr: set an xattr of a path by name
* delxattr: delete an xattr of a path by name
For each file in the underlying FS, this class maintains a corresponding
'.xattrs.FILENAME' file containing its extended attributes. Extended
attributes of a directory are stored in the file '.xattrs' within the
directory itself.
"""
def _get_attr_path(self, path, isdir=None):
"""Get the path of the file containing xattrs for the given path."""
if isdir is None:
isdir = self.wrapped_fs.isdir(path)
if isdir:
attr_path = pathjoin(path, '.xattrs')
else:
dir_path, file_name = pathsplit(path)
attr_path = pathjoin(dir_path, '.xattrs.'+file_name)
return attr_path
def _is_attr_path(self, path):
"""Check whether the given path references an xattrs file."""
_,name = pathsplit(path)
if name.startswith(".xattrs"):
return True
return False
def _get_attr_dict(self, path):
"""Retrieve the xattr dictionary for the given path."""
attr_path = self._get_attr_path(path)
if self.wrapped_fs.exists(attr_path):
try:
return pickle.loads(self.wrapped_fs.getcontents(attr_path))
except EOFError:
return {}
else:
return {}
def _set_attr_dict(self, path, attrs):
"""Store the xattr dictionary for the given path."""
attr_path = self._get_attr_path(path)
self.wrapped_fs.setcontents(attr_path, pickle.dumps(attrs))
@synchronize
def setxattr(self, path, key, value):
"""Set an extended attribute on the given path."""
if not self.exists(path):
raise ResourceNotFoundError(path)
key = unicode(key)
attrs = self._get_attr_dict(path)
attrs[key] = str(value)
self._set_attr_dict(path, attrs)
@synchronize
def getxattr(self, path, key, default=None):
"""Retrieve an extended attribute for the given path."""
if not self.exists(path):
raise ResourceNotFoundError(path)
attrs = self._get_attr_dict(path)
return attrs.get(key, default)
@synchronize
def delxattr(self, path, key):
if not self.exists(path):
raise ResourceNotFoundError(path)
attrs = self._get_attr_dict(path)
try:
del attrs[key]
except KeyError:
pass
self._set_attr_dict(path, attrs)
@synchronize
def listxattrs(self,path):
"""List all the extended attribute keys set on the given path."""
if not self.exists(path):
raise ResourceNotFoundError(path)
return self._get_attr_dict(path).keys()
def _encode(self,path):
"""Prevent requests for operations on .xattr files."""
if self._is_attr_path(path):
raise PathError(path,msg="Paths cannot contain '.xattrs': %(path)s")
return path
def _decode(self,path):
return path
def listdir(self,path="",*args,**kwds):
"""Prevent .xattr from appearing in listings."""
entries = self.wrapped_fs.listdir(path,*args,**kwds)
return [e for e in entries if not self._is_attr_path(e)]
def ilistdir(self,path="",*args,**kwds):
"""Prevent .xattr from appearing in listings."""
for e in self.wrapped_fs.ilistdir(path,*args,**kwds):
if not self._is_attr_path(e):
yield e
def remove(self,path):
"""Remove .xattr when removing a file."""
attr_file = self._get_attr_path(path,isdir=False)
self.wrapped_fs.remove(path)
try:
self.wrapped_fs.remove(attr_file)
except ResourceNotFoundError:
pass
def removedir(self,path,recursive=False,force=False):
"""Remove .xattr when removing a directory."""
try:
self.wrapped_fs.removedir(path,recursive=recursive,force=force)
except DirectoryNotEmptyError:
# The xattr file could block the underlying removedir().
# Remove it, but be prepared to restore it on error.
if self.listdir(path) != []:
raise
attr_file = self._get_attr_path(path,isdir=True)
attr_file_contents = self.wrapped_fs.getcontents(attr_file)
self.wrapped_fs.remove(attr_file)
try:
self.wrapped_fs.removedir(path,recursive=recursive)
except FSError:
self.wrapped_fs.setcontents(attr_file,attr_file_contents)
raise
def copy(self,src,dst,**kwds):
"""Ensure xattrs are copied when copying a file."""
self.wrapped_fs.copy(self._encode(src),self._encode(dst),**kwds)
s_attr_file = self._get_attr_path(src)
d_attr_file = self._get_attr_path(dst)
try:
self.wrapped_fs.copy(s_attr_file,d_attr_file,overwrite=True)
except ResourceNotFoundError,e:
pass
def move(self,src,dst,**kwds):
"""Ensure xattrs are preserved when moving a file."""
self.wrapped_fs.move(self._encode(src),self._encode(dst),**kwds)
s_attr_file = self._get_attr_path(src)
d_attr_file = self._get_attr_path(dst)
try:
self.wrapped_fs.move(s_attr_file,d_attr_file,overwrite=True)
except ResourceNotFoundError:
pass
|