# Copyright (C) 2003-2009 Robey Pointer # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ A stub SFTP server for loopback SFTP testing. """ import os from paramiko import ( AUTH_SUCCESSFUL, OPEN_SUCCEEDED, SFTPAttributes, SFTPHandle, SFTPServer, SFTPServerInterface, SFTP_FAILURE, SFTP_OK, ServerInterface, ) from paramiko.common import o666 class StubServer(ServerInterface): def check_auth_password(self, username, password): # all are allowed return AUTH_SUCCESSFUL def check_channel_request(self, kind, chanid): return OPEN_SUCCEEDED class StubSFTPHandle(SFTPHandle): def stat(self): try: return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno())) except OSError as e: return SFTPServer.convert_errno(e.errno) def chattr(self, attr): # python doesn't have equivalents to fchown or fchmod, so we have to # use the stored filename try: SFTPServer.set_file_attr(self.filename, attr) return SFTP_OK except OSError as e: return SFTPServer.convert_errno(e.errno) class StubSFTPServer(SFTPServerInterface): # assume current folder is a fine root # (the tests always create and eventually delete a subfolder, so there # shouldn't be any mess) ROOT = os.getcwd() def _realpath(self, path): return self.ROOT + self.canonicalize(path) def list_folder(self, path): path = self._realpath(path) try: out = [] flist = os.listdir(path) for fname in flist: attr = SFTPAttributes.from_stat( os.stat(os.path.join(path, fname)) ) attr.filename = fname out.append(attr) return out except OSError as e: return SFTPServer.convert_errno(e.errno) def stat(self, path): path = self._realpath(path) try: return SFTPAttributes.from_stat(os.stat(path)) except OSError as e: return SFTPServer.convert_errno(e.errno) def lstat(self, path): path = self._realpath(path) try: return SFTPAttributes.from_stat(os.lstat(path)) except OSError as e: return SFTPServer.convert_errno(e.errno) def open(self, path, flags, attr): path = self._realpath(path) try: binary_flag = getattr(os, "O_BINARY", 0) flags |= binary_flag mode = getattr(attr, "st_mode", None) if mode is not None: fd = os.open(path, flags, mode) else: # os.open() defaults to 0777 which is # an odd default mode for files fd = os.open(path, flags, o666) except OSError as e: return SFTPServer.convert_errno(e.errno) if (flags & os.O_CREAT) and (attr is not None): attr._flags &= ~attr.FLAG_PERMISSIONS SFTPServer.set_file_attr(path, attr) if flags & os.O_WRONLY: if flags & os.O_APPEND: fstr = "ab" else: fstr = "wb" elif flags & os.O_RDWR: if flags & os.O_APPEND: fstr = "a+b" else: fstr = "r+b" else: # O_RDONLY (== 0) fstr = "rb" try: f = os.fdopen(fd, fstr) except OSError as e: return SFTPServer.convert_errno(e.errno) fobj = StubSFTPHandle(flags) fobj.filename = path fobj.readfile = f fobj.writefile = f return fobj def remove(self, path): path = self._realpath(path) try: os.remove(path) except OSError as e: return SFTPServer.convert_errno(e.errno) return SFTP_OK def rename(self, oldpath, newpath): oldpath = self._realpath(oldpath) newpath = self._realpath(newpath) if os.path.exists(newpath): return SFTP_FAILURE try: os.rename(oldpath, newpath) except OSError as e: return SFTPServer.convert_errno(e.errno) return SFTP_OK def posix_rename(self, oldpath, newpath): oldpath = self._realpath(oldpath) newpath = self._realpath(newpath) try: os.rename(oldpath, newpath) except OSError as e: return SFTPServer.convert_errno(e.errno) return SFTP_OK def mkdir(self, path, attr): path = self._realpath(path) try: os.mkdir(path) if attr is not None: SFTPServer.set_file_attr(path, attr) except OSError as e: return SFTPServer.convert_errno(e.errno) return SFTP_OK def rmdir(self, path): path = self._realpath(path) try: os.rmdir(path) except OSError as e: return SFTPServer.convert_errno(e.errno) return SFTP_OK def chattr(self, path, attr): path = self._realpath(path) try: SFTPServer.set_file_attr(path, attr) except OSError as e: return SFTPServer.convert_errno(e.errno) return SFTP_OK def symlink(self, target_path, path): path = self._realpath(path) if (len(target_path) > 0) and (target_path[0] == "/"): # absolute symlink target_path = os.path.join(self.ROOT, target_path[1:]) if target_path[:2] == "//": # bug in os.path.join target_path = target_path[1:] else: # compute relative to path abspath = os.path.join(os.path.dirname(path), target_path) if abspath[: len(self.ROOT)] != self.ROOT: # this symlink isn't going to work anyway -- just break it # immediately target_path = "" try: os.symlink(target_path, path) except OSError as e: return SFTPServer.convert_errno(e.errno) return SFTP_OK def readlink(self, path): path = self._realpath(path) try: symlink = os.readlink(path) except OSError as e: return SFTPServer.convert_errno(e.errno) # if it's absolute, remove the root if os.path.isabs(symlink): if symlink[: len(self.ROOT)] == self.ROOT: symlink = symlink[len(self.ROOT) :] if (len(symlink) == 0) or (symlink[0] != "/"): symlink = "/" + symlink else: symlink = "" return symlink