1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
#!/usr/bin/env python
from fs.commands.runner import Command
import sys
import platform
import os
import os.path
platform = platform.system()
class FSMount(Command):
if platform == "Windows":
usage = """fsmount [OPTIONS]... [FS] [DRIVE LETTER]
or fsmount -u [DRIVER LETTER]
Mounts a filesystem on a drive letter"""
else:
usage = """fsmount [OPTIONS]... [FS] [SYSTEM PATH]
or fsmount -u [SYSTEM PATH]
Mounts a file system on a system path"""
version = "1.0"
def get_optparse(self):
optparse = super(FSMount, self).get_optparse()
optparse.add_option('-f', '--foreground', dest='foreground', action="store_true", default=False,
help="run the mount process in the foreground", metavar="FOREGROUND")
optparse.add_option('-u', '--unmount', dest='unmount', action="store_true", default=False,
help="unmount path", metavar="UNMOUNT")
optparse.add_option('-n', '--nocache', dest='nocache', action="store_true", default=False,
help="do not cache network filesystems", metavar="NOCACHE")
return optparse
def do_run(self, options, args):
windows = platform == "Windows"
if options.unmount:
if windows:
try:
mount_path = args[0][:1]
except IndexError:
self.error('Driver letter required\n')
return 1
from fs.expose import dokan
mount_path = mount_path[:1].upper()
self.output('unmounting %s:...\n' % mount_path, True)
dokan.unmount(mount_path)
return
else:
try:
mount_path = args[0]
except IndexError:
self.error(self.usage + '\n')
return 1
from fs.expose import fuse
self.output('unmounting %s...\n' % mount_path, True)
fuse.unmount(mount_path)
return
try:
fs_url = args[0]
except IndexError:
self.error(self.usage + '\n')
return 1
try:
mount_path = args[1]
except IndexError:
if windows:
mount_path = mount_path[:1].upper()
self.error(self.usage + '\n')
else:
self.error(self.usage + '\n')
return 1
fs, path = self.open_fs(fs_url, create_dir=True)
if path:
if not fs.isdir(path):
self.error('%s is not a directory on %s' % (fs_url, fs))
return 1
fs = fs.opendir(path)
path = '/'
if not options.nocache:
fs.cache_hint(True)
if windows:
from fs.expose import dokan
if len(mount_path) > 1:
self.error('Driver letter should be one character')
return 1
self.output("Mounting %s on %s:\n" % (fs, mount_path), True)
flags = dokan.DOKAN_OPTION_REMOVABLE
if options.debug:
flags |= dokan.DOKAN_OPTION_DEBUG | dokan.DOKAN_OPTION_STDERR
mp = dokan.mount(fs,
mount_path,
numthreads=5,
foreground=options.foreground,
flags=flags,
volname=str(fs))
else:
if not os.path.exists(mount_path):
try:
os.makedirs(mount_path)
except:
pass
from fs.expose import fuse
self.output("Mounting %s on %s\n" % (fs, mount_path), True)
if options.foreground:
fuse_process = fuse.mount(fs,
mount_path,
foreground=True)
else:
if not os.fork():
mp = fuse.mount(fs,
mount_path,
foreground=True)
else:
fs.close = lambda: None
def run():
return FSMount().run()
if __name__ == "__main__":
sys.exit(run())
|