summaryrefslogtreecommitdiff
path: root/fs/tests/test_ftpfs.py
blob: d37e5dfe44ba47d0b9f5b4b61ce71414cb19b10a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python
from fs.tests import FSTestCases, ThreadingTestCases

import unittest

import os
import sys
import shutil
import tempfile
import subprocess
import time
from os.path import abspath
import urllib

from six import PY3


try:
    from pyftpdlib.authorizers import DummyAuthorizer
    from pyftpdlib.handlers import FTPHandler
    from pyftpdlib.servers import FTPServer
except ImportError:
    if not PY3:
        raise ImportError("Requires pyftpdlib <http://code.google.com/p/pyftpdlib/>")

from fs.path import *

from fs import ftpfs

ftp_port = 30000
class TestFTPFS(unittest.TestCase, FSTestCases, ThreadingTestCases):

    __test__ = not PY3

    def setUp(self):
        global ftp_port
        ftp_port += 1
        use_port = str(ftp_port)
        #ftp_port = 10000
        self.temp_dir = tempfile.mkdtemp(u"ftpfstests")

        file_path = __file__
        if ':' not in file_path:
            file_path = abspath(file_path)
        # Apparently Windows requires values from default environment, so copy the exisiting os.environ
        env = os.environ.copy()
        env['PYTHONPATH'] = os.getcwd() + os.pathsep + env.get('PYTHONPATH', '')
        self.ftp_server = subprocess.Popen([sys.executable,
                                            file_path,
                                            self.temp_dir,
                                            use_port],
                                           stdout=subprocess.PIPE,
                                           env=env)
        # Block until the server writes a line to stdout
        self.ftp_server.stdout.readline()

        # Poll until a connection can be made
        start_time = time.time()
        while time.time() - start_time < 5:
            try:
                ftpurl = urllib.urlopen('ftp://127.0.0.1:%s' % use_port)
            except IOError:
                time.sleep(0)
            else:
                ftpurl.read()
                ftpurl.close()
                break
        else:
            # Avoid a possible infinite loop
            raise Exception("Unable to connect to ftp server")

        self.fs = ftpfs.FTPFS('127.0.0.1', 'user', '12345', dircache=True, port=use_port, timeout=5.0)
        self.fs.cache_hint(True)


    def tearDown(self):
        #self.ftp_server.terminate()
        if sys.platform == 'win32':
            os.popen('TASKKILL /PID '+str(self.ftp_server.pid)+' /F')
        else:
            os.system('kill '+str(self.ftp_server.pid))
        shutil.rmtree(self.temp_dir)
        self.fs.close()

    def check(self, p):
        check_path = self.temp_dir.rstrip(os.sep) + os.sep + p
        return os.path.exists(check_path.encode('utf-8'))


if __name__ == "__main__":

    # Run an ftp server that exposes a given directory
    import sys
    authorizer = DummyAuthorizer()
    authorizer.add_user("user", "12345", sys.argv[1], perm="elradfmw")
    authorizer.add_anonymous(sys.argv[1])

    #def nolog(*args):
    #    pass
    #ftpserver.log = nolog
    #ftpserver.logline = nolog

    handler = FTPHandler
    handler.authorizer = authorizer
    address = ("127.0.0.1", int(sys.argv[2]))
    #print address

    ftpd = FTPServer(address, handler)

    sys.stdout.write('serving\n')
    sys.stdout.flush()
    ftpd.serve_forever()