From 8c37a5bdfdd46d5cfad6e9d67925ddef9ca382bf Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 21 Mar 2002 07:22:43 +0000 Subject: First checkin git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@2 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/rdiff_backup/connection.py | 467 ++++++++++++++++++++++++++++++++ 1 file changed, 467 insertions(+) create mode 100644 rdiff-backup/rdiff_backup/connection.py (limited to 'rdiff-backup/rdiff_backup/connection.py') diff --git a/rdiff-backup/rdiff_backup/connection.py b/rdiff-backup/rdiff_backup/connection.py new file mode 100644 index 0000000..83fc874 --- /dev/null +++ b/rdiff-backup/rdiff_backup/connection.py @@ -0,0 +1,467 @@ +execfile("rdiff.py") +import types, os, tempfile, cPickle, shutil, traceback + +####################################################################### +# +# connection - Code that deals with remote execution +# + +class ConnectionError(Exception): + pass + +class ConnectionQuit(Exception): + pass + + +class Connection: + """Connection class - represent remote execution + + The idea is that, if c is an instance of this class, c.foo will + return the object on the remote side. For functions, c.foo will + return a function that, when called, executes foo on the remote + side, sending over the arguments and sending back the result. + + """ + def __repr__(self): return self.__str__() + + +class LocalConnection(Connection): + """Local connection + + This is a dummy connection class, so that LC.foo just evaluates to + foo using global scope. + + """ + def __init__(self): + """This prevents two instances of LocalConnection""" + assert not Globals.local_connection + self.conn_number = 0 # changed by SetConnections for server + + def __getattr__(self, name): + try: return globals()[name] + except KeyError: + builtins = globals()["__builtins__"] + try: + if type(builtins) is types.ModuleType: + return builtins.__dict__[name] + else: return builtins[name] + except KeyError: raise NameError, name + + def __setattr__(self, name, value): + globals()[name] = value + + def __delattr__(self, name): + del globals()[name] + + def __str__(self): return "LocalConnection" + + def reval(self, function_string, *args): + return apply(eval(function_string), args) + + def quit(self): pass + +Globals.local_connection = LocalConnection() +Globals.connections.append(Globals.local_connection) +# Following changed by server in SetConnections +Globals.connection_dict[0] = Globals.local_connection + + +class ConnectionRequest: + """Simple wrapper around a PipeConnection request""" + def __init__(self, function_string, num_args): + self.function_string = function_string + self.num_args = num_args + + def __str__(self): + return "ConnectionRequest: %s with %d arguments" % \ + (self.function_string, self.num_args) + + +class LowLevelPipeConnection(Connection): + """Routines for just sending objects from one side of pipe to another + + Each thing sent down the pipe is paired with a request number, + currently limited to be between 0 and 255. The size of each thing + should be less than 2^56. + + Each thing also has a type, indicated by one of the following + characters: + + o - generic object + i - iterator/generator of RORPs + f - file object + b - string + q - quit signal + t - TempFile + R - RPath + r - RORPath only + c - PipeConnection object + + """ + def __init__(self, inpipe, outpipe): + """inpipe is a file-type open for reading, outpipe for writing""" + self.inpipe = inpipe + self.outpipe = outpipe + + def __str__(self): + """Return string version + + This is actually an important function, because otherwise + requests to represent this object would result in "__str__" + being executed on the other side of the connection. + + """ + return "LowLevelPipeConnection" + + def _put(self, obj, req_num): + """Put an object into the pipe (will send raw if string)""" + Log.conn("sending", obj, req_num) + if type(obj) is types.StringType: self._putbuf(obj, req_num) + elif isinstance(obj, Connection): self._putconn(obj, req_num) + elif isinstance(obj, TempFile): self._puttempfile(obj, req_num) + elif isinstance(obj, RPath): self._putrpath(obj, req_num) + elif isinstance(obj, RORPath): self._putrorpath(obj, req_num) + elif ((hasattr(obj, "read") or hasattr(obj, "write")) + and hasattr(obj, "close")): self._putfile(obj, req_num) + elif hasattr(obj, "next"): self._putiter(obj, req_num) + else: self._putobj(obj, req_num) + + def _putobj(self, obj, req_num): + """Send a generic python obj down the outpipe""" + self._write("o", cPickle.dumps(obj, 1), req_num) + + def _putbuf(self, buf, req_num): + """Send buffer buf down the outpipe""" + self._write("b", buf, req_num) + + def _putfile(self, fp, req_num): + """Send a file to the client using virtual files""" + self._write("f", str(VirtualFile.new(fp)), req_num) + + def _putiter(self, iterator, req_num): + """Put an iterator through the pipe""" + self._write("i", str(VirtualFile.new(RORPIter.ToFile(iterator))), + req_num) + + def _puttempfile(self, tempfile, req_num): + """Put a tempfile into pipe. See _putrpath""" + tf_repr = (tempfile.conn.conn_number, tempfile.base, + tempfile.index, tempfile.data) + self._write("t", cPickle.dumps(tf_repr, 1), req_num) + + def _putrpath(self, rpath, req_num): + """Put an rpath into the pipe + + The rpath's connection will be encoded as its conn_number. It + and the other information is put in a tuple. + + """ + rpath_repr = (rpath.conn.conn_number, rpath.base, + rpath.index, rpath.data) + self._write("R", cPickle.dumps(rpath_repr, 1), req_num) + + def _putrorpath(self, rorpath, req_num): + """Put an rorpath into the pipe + + This is only necessary because if there is a .file attached, + it must be excluded from the pickling + + """ + rorpath_repr = (rorpath.index, rorpath.data) + self._write("r", cPickle.dumps(rorpath_repr, 1), req_num) + + def _putconn(self, pipeconn, req_num): + """Put a connection into the pipe + + A pipe connection is represented just as the integer (in + string form) of its connection number it is *connected to*. + + """ + self._write("c", str(pipeconn.conn_number), req_num) + + def _putquit(self): + """Send a string that takes down server""" + self._write("q", "", 255) + + def _write(self, headerchar, data, req_num): + """Write header and then data to the pipe""" + self.outpipe.write(headerchar + chr(req_num) + self._l2s(len(data))) + self.outpipe.write(data) + self.outpipe.flush() + + def _read(self, length): + """Read length bytes from inpipe, returning result""" + return self.inpipe.read(length) + + def _s2l(self, s): + """Convert string to long int""" + assert len(s) == 7 + l = 0L + for i in range(7): l = l*256 + ord(s[i]) + return l + + def _l2s(self, l): + """Convert long int to string""" + s = "" + for i in range(7): + l, remainder = divmod(l, 256) + s = chr(remainder) + s + assert remainder == 0 + return s + + def _get(self): + """Read an object from the pipe and return (req_num, value)""" + header_string = self.inpipe.read(9) + assert len(header_string) == 9, \ + "Error reading from pipe (problem probably originated remotely)" + try: + format_string, req_num, length = (header_string[0], + ord(header_string[1]), + self._s2l(header_string[2:])) + except IndexError: raise ConnectionError() + if format_string == "o": result = cPickle.loads(self._read(length)) + elif format_string == "b": result = self._read(length) + elif format_string == "f": + result = VirtualFile(self, int(self._read(length))) + elif format_string == "i": + result = RORPIter.FromFile(BufferedRead( + VirtualFile(self, int(self._read(length))))) + elif format_string == "t": + result = self._gettempfile(self._read(length)) + elif format_string == "r": + result = self._getrorpath(self._read(length)) + elif format_string == "R": result = self._getrpath(self._read(length)) + elif format_string == "c": + result = Globals.connection_dict[int(self._read(length))] + else: + assert format_string == "q", header_string + raise ConnectionQuit("Received quit signal") + Log.conn("received", result, req_num) + return (req_num, result) + + def _getrorpath(self, raw_rorpath_buf): + """Reconstruct RORPath object from raw data""" + index, data = cPickle.loads(raw_rorpath_buf) + return RORPath(index, data) + + def _gettempfile(self, raw_tf_buf): + """Return TempFile object indicated by raw_tf_buf""" + conn_number, base, index, data = cPickle.loads(raw_tf_buf) + return TempFile(Globals.connection_dict[conn_number], + base, index, data) + + def _getrpath(self, raw_rpath_buf): + """Return RPath object indicated by raw_rpath_buf""" + conn_number, base, index, data = cPickle.loads(raw_rpath_buf) + return RPath(Globals.connection_dict[conn_number], base, index, data) + + def _close(self): + """Close the pipes associated with the connection""" + self.outpipe.close() + self.inpipe.close() + + +class PipeConnection(LowLevelPipeConnection): + """Provide server and client functions for a Pipe Connection + + Both sides act as modules that allows for remote execution. For + instance, self.conn.pow(2,8) will execute the operation on the + server side. + + The only difference between the client and server is that the + client makes the first request, and the server listens first. + + """ + def __init__(self, inpipe, outpipe, conn_number = 0): + """Init PipeConnection + + conn_number should be a unique (to the session) integer to + identify the connection. For instance, all connections to the + client have conn_number 0. Other connections can use this + number to route commands to the correct process. + + """ + LowLevelPipeConnection.__init__(self, inpipe, outpipe) + self.conn_number = conn_number + self.unused_request_numbers = {} + for i in range(256): self.unused_request_numbers[i] = None + + def __str__(self): return "PipeConnection %d" % self.conn_number + + def get_response(self, desired_req_num): + """Read from pipe, responding to requests until req_num. + + Sometimes after a request is sent, the other side will make + another request before responding to the original one. In + that case, respond to the request. But return once the right + response is given. + + """ + while 1: + try: req_num, object = self._get() + except ConnectionQuit: + self._put("quitting", self.get_new_req_num()) + return + if req_num == desired_req_num: return object + else: + assert isinstance(object, ConnectionRequest) + self.answer_request(object, req_num) + + def answer_request(self, request, req_num): + """Put the object requested by request down the pipe""" + del self.unused_request_numbers[req_num] + argument_list = [] + for i in range(request.num_args): + arg_req_num, arg = self._get() + assert arg_req_num == req_num + argument_list.append(arg) + try: result = apply(eval(request.function_string), argument_list) + except: result = self.extract_exception() + self._put(result, req_num) + self.unused_request_numbers[req_num] = None + + def extract_exception(self): + """Return active exception""" + Log("Sending back exception: \n" + + "".join(traceback.format_tb(sys.exc_info()[2])), 2) + return sys.exc_info()[1] + + def Server(self): + """Start server's read eval return loop""" + Globals.server = 1 + Globals.connections.append(self) + Log("Starting server", 6) + self.get_response(-1) + + def reval(self, function_string, *args): + """Execute command on remote side + + The first argument should be a string that evaluates to a + function, like "pow", and the remaining are arguments to that + function. + + """ + req_num = self.get_new_req_num() + self._put(ConnectionRequest(function_string, len(args)), req_num) + for arg in args: self._put(arg, req_num) + result = self.get_response(req_num) + self.unused_request_numbers[req_num] = None + if isinstance(result, Exception): raise result + else: return result + + def get_new_req_num(self): + """Allot a new request number and return it""" + if not self.unused_request_numbers: + raise ConnectionError("Exhaused possible connection numbers") + req_num = self.unused_request_numbers.keys()[0] + del self.unused_request_numbers[req_num] + return req_num + + def quit(self): + """Close the associated pipes and tell server side to quit""" + assert not Globals.server + self._putquit() + self._get() + self._close() + + def __getattr__(self, name): + """Intercept attributes to allow for . invocation""" + return EmulateCallable(self, name) + + +class RedirectedConnection(Connection): + """Represent a connection more than one move away + + For instance, suppose things are connected like this: S1---C---S2. + If Server1 wants something done by Server2, it will have to go + through the Client. So on S1's side, S2 will be represented by a + RedirectedConnection. + + """ + def __init__(self, conn_number, routing_number = 0): + """RedirectedConnection initializer + + Returns a RedirectedConnection object for the given + conn_number, where commands are routed through the connection + with the given routing_number. 0 is the client, so the + default shouldn't have to be changed. + + """ + self.conn_number = conn_number + self.routing_number = routing_number + self.routing_conn = Globals.connection_dict[routing_number] + + def __str__(self): + return "RedirectedConnection %d,%d" % (self.conn_number, + self.routing_number) + + def __getattr__(self, name): + return EmulateCallable(self.routing_conn, + "Globals.get_dict_val('connection_dict', %d).%s" + % (self.conn_number, name)) + + +class EmulateCallable: + """This is used by PipeConnection in calls like conn.os.chmod(foo)""" + def __init__(self, connection, name): + self.connection = connection + self.name = name + def __call__(self, *args): + return apply(self.connection.reval, (self.name,) + args) + def __getattr__(self, attr_name): + return EmulateCallable(self.connection, + "%s.%s" % (self.name, attr_name)) + + +class VirtualFile: + """When the client asks for a file over the connection, it gets this + + The returned instance then forwards requests over the connection. + The class's dictionary is used by the server to associate each + with a unique file number. + + """ + #### The following are used by the server + vfiles = {} + counter = 0 + + def getbyid(cls, id): + return cls.vfiles[id] + getbyid = classmethod(getbyid) + + def readfromid(cls, id, length): + return cls.vfiles[id].read(length) + readfromid = classmethod(readfromid) + + def writetoid(cls, id, buffer): + return cls.vfiles[id].write(buffer) + writetoid = classmethod(writetoid) + + def closebyid(cls, id): + fp = cls.vfiles[id] + del cls.vfiles[id] + return fp.close() + closebyid = classmethod(closebyid) + + def new(cls, fileobj): + """Associate a new VirtualFile with a read fileobject, return id""" + count = cls.counter + cls.vfiles[count] = fileobj + cls.counter = count + 1 + return count + new = classmethod(new) + + + #### And these are used by the client + def __init__(self, connection, id): + self.connection = connection + self.id = id + + def read(self, length = -1): + return self.connection.VirtualFile.readfromid(self.id, length) + + def write(self, buf): + return self.connection.VirtualFile.writetoid(self.id, buf) + + def close(self): + return self.connection.VirtualFile.closebyid(self.id) -- cgit v1.2.1