import multiprocessing import os import posixpath import html import threading import base64 from http.server import SimpleHTTPRequestHandler, HTTPServer, HTTPStatus class Unauthorized(Exception): pass class RequestHandler(SimpleHTTPRequestHandler): def get_root_dir(self): authorization = self.headers.get('authorization') if not authorization: if not self.server.anonymous_dir: raise Unauthorized('unauthorized') return self.server.anonymous_dir else: authorization = authorization.split() if len(authorization) != 2 or authorization[0].lower() != 'basic': raise Unauthorized('unauthorized') try: decoded = base64.decodebytes(authorization[1].encode('ascii')) user, password = decoded.decode('ascii').split(':') expected_password, directory = self.server.users[user] if password == expected_password: return directory except: # noqa raise Unauthorized('unauthorized') return None def unauthorized(self): shortmsg, longmsg = self.responses[HTTPStatus.UNAUTHORIZED] self.send_response(HTTPStatus.UNAUTHORIZED, shortmsg) self.send_header('Connection', 'close') content = (self.error_message_format % { 'code': HTTPStatus.UNAUTHORIZED, 'message': html.escape(longmsg, quote=False), 'explain': html.escape(longmsg, quote=False) }) body = content.encode('UTF-8', 'replace') self.send_header('Content-Type', self.error_content_type) self.send_header('Content-Length', str(len(body))) self.send_header('WWW-Authenticate', 'Basic realm="{}"'.format(self.server.realm)) self.end_headers() self.end_headers() if self.command != 'HEAD' and body: self.wfile.write(body) def do_GET(self): try: super().do_GET() except Unauthorized: self.unauthorized() def do_HEAD(self): try: super().do_HEAD() except Unauthorized: self.unauthorized() def translate_path(self, path): path = path.split('?', 1)[0] path = path.split('#', 1)[0] path = posixpath.normpath(path) assert posixpath.isabs(path) path = posixpath.relpath(path, '/') return os.path.join(self.get_root_dir(), path) class AuthHTTPServer(HTTPServer): def __init__(self, *args, **kwargs): self.users = {} self.anonymous_dir = None self.realm = 'Realm' super().__init__(*args, **kwargs) class SimpleHttpServer(multiprocessing.Process): def __init__(self): self.__stop = multiprocessing.Queue() super().__init__() self.server = AuthHTTPServer(('127.0.0.1', 0), RequestHandler) self.started = False def start(self): self.started = True super().start() def run(self): t = threading.Thread(target=self.server.serve_forever) t.start() self.__stop.get() self.server.shutdown() t.join() def stop(self): if not self.started: return self.__stop.put(None) self.terminate() self.join() def allow_anonymous(self, cwd): self.server.anonymous_dir = cwd def add_user(self, user, password, cwd): self.server.users[user] = (password, cwd) def base_url(self): return 'http://127.0.0.1:{}'.format(self.server.server_port)