From eafe9b81dd96e9c8f59dfaf3c62d01aa25be64df Mon Sep 17 00:00:00 2001 From: bescoto Date: Thu, 4 Sep 2003 05:16:12 +0000 Subject: Added user_group module and associated tests git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@416 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/rdiff_backup/Globals.py | 6 ++ rdiff-backup/rdiff_backup/user_group.py | 154 ++++++++++++++++++++++++++++++++ rdiff-backup/testing/commontest.py | 8 +- rdiff-backup/testing/user_grouptest.py | 55 ++++++++++++ 4 files changed, 222 insertions(+), 1 deletion(-) create mode 100644 rdiff-backup/rdiff_backup/user_group.py create mode 100644 rdiff-backup/testing/user_grouptest.py diff --git a/rdiff-backup/rdiff_backup/Globals.py b/rdiff-backup/rdiff_backup/Globals.py index 2338659..d6238e1 100644 --- a/rdiff-backup/rdiff_backup/Globals.py +++ b/rdiff-backup/rdiff_backup/Globals.py @@ -124,6 +124,12 @@ backup_writer = None # Connection of the client client_conn = None +# When backing up, issource should be true on the reader and isdest on +# the writer. When restoring, issource should be true on the mirror +# and isdest should be true on the target. +issource = None +isdest = None + # This list is used by the set function below. When a new # connection is created with init_connection, its Globals class # will match this one for all the variables mentioned in this diff --git a/rdiff-backup/rdiff_backup/user_group.py b/rdiff-backup/rdiff_backup/user_group.py new file mode 100644 index 0000000..cff2943 --- /dev/null +++ b/rdiff-backup/rdiff_backup/user_group.py @@ -0,0 +1,154 @@ +# Copyright 2002, 2003 Ben Escoto +# +# This file is part of rdiff-backup. +# +# rdiff-backup is free software; you can redistribute it and/or modify +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# rdiff-backup is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with rdiff-backup; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +"""This module deal with users and groups + +On each connection we may need to map unames and gnames to uids and +gids, and possibly vice-versa. So maintain a separate dictionary for +this. + +On the destination connection only, if necessary have a separate +dictionary of mappings, which specify how to map users/groups on one +connection to the users/groups on the other. + +""" + +import grp, pwd +import log, Globals + +# This should be set to the user UserMap class object if using +# user-defined user mapping, and a Map class object otherwise. +UserMap = None + +# This should be set to the group UserMap class object if using +# user-defined group mapping, and a Map class object otherwise. +GroupMap = None + + +uid2uname_dict = {}; gid2gname_dict = {} +def uid2uname(uid): + """Given uid, return uname or None if cannot find""" + try: return uid2uname_dict[uid] + except KeyError: + try: uname = pwd.getpwuid(uid)[0] + except KeyError: uname = None + uid2uname_dict[uid] = uname + return uname + +def gid2gname(gid): + """Given gid, return group name or None if cannot find""" + try: return gid2gname_dict[gid] + except KeyError: + try: gname = grp.getgrgid(gid)[0] + except KeyError: gname = None + gid2gname_dict[gid] = gname + return gname + + +class Map: + """Used for mapping names and id on source side to dest side""" + def __init__(self, name2id_func): + """Map initializer, set dictionaries""" + assert Globals.isdest, "Should run on destination connection" + self.name2id_dict = {} + self.name2id_func = name2id_func + + def get_id(self, id, name = None): + """Return mapped id from id and, if available, name""" + if not name: return self.get_id_from_id(id) + try: return self.name2id_dict[name] + except KeyError: + out_id = self.find_id(id, name) + self.name2id_dict[name] = out_id + return out_id + + def get_id_from_id(self, id): return id + + def find_id(self, id, name): + """Find the proper id to use with given id and name""" + try: return self.name2id_func(name) + except KeyError: return id + +class DefinedMap(Map): + """Map names and ids on source side to appropriate ids on dest side + + Like map, but initialize with user-defined mapping string, which + supersedes Map. + + """ + def __init__(self, name2id_func, mapping_string): + """Initialize object with given mapping string + + The mapping_string should consist of a number of lines, each which + should have the form "source_id_or_name:dest_id_or_name". Do user + mapping unless user is false, then do group. + + """ + Map.__init__(self, name2id_func) + self.name_mapping_dict = {}; self.id_mapping_dict = {} + + for line in mapping_string.split('\n'): + line = line.strip() + if not line: continue + comps = line.split(':') + if not len(comps) == 2: + log.Log.FatalError("Error parsing mapping file, bad line: " + + line) + old, new = comps + + try: self.id_mapping_dict[int(old)] = self.init_get_new_id(new) + except ValueError: + self.name_mapping_dict[old] = self.init_get_new_id(new) + + def init_get_new_id(self, id_or_name): + """Return id of id_or_name, failing if cannot. Used in __init__""" + try: return int(id_or_name) + except ValueError: + try: id = self.name2id_func(id_or_name) + except KeyError: + log.Log.FatalError("Cannot get id for user or group name" + + id_or_name) + return id + + def get_id_from_id(self, id): return self.id_mapping_dict.get(id, id) + + def find_id(self, id, name): + """Find proper id to use when source file has give id and name""" + try: return self.name_mapping_dict[name] + except KeyError: + try: return self.id_mapping_dict[id] + except KeyError: return Map.find_id(self, id, name) + +def init_user_mapping(mapping_string = None): + """Initialize user mapping with given mapping string or None""" + global UserMap + name2id_func = lambda name: pwd.getpwnam(name)[2] + if mapping_string: UserMap = DefinedMap(name2id_func, mapping_string) + else: UserMap = Map(name2id_func) + +def init_group_mapping(mapping_string): + """Initialize the group mapping dictionary with given mapping string""" + global GroupMap + name2id_func = lambda name: grp.getgrnam(name)[2] + if mapping_string: GroupMap = DefinedMap(name2id_func, mapping_string) + else: GroupMap = Map(name2id_func) + + + + diff --git a/rdiff-backup/testing/commontest.py b/rdiff-backup/testing/commontest.py index b60018d..4cfac90 100644 --- a/rdiff-backup/testing/commontest.py +++ b/rdiff-backup/testing/commontest.py @@ -1,5 +1,5 @@ """commontest - Some functions and constants common to several test cases""" -import os, sys +import os, sys, code from rdiff_backup.log import Log from rdiff_backup.rpath import RPath from rdiff_backup import Globals, Hardlink, SetConnections, Main, \ @@ -363,3 +363,9 @@ def MirrorTest(source_local, dest_local, list_of_dirnames, _reset_connections(src_rp, dest_rp) assert CompareRecursive(src_rp, dest_rp, compare_hardlinks) Main.force = old_force_val + +def raise_interpreter(use_locals = None): + """Start python interpreter, with local variables if locals is true""" + if use_locals: local_dict = locals() + else: local_dict = globals() + code.InteractiveConsole(local_dict).interact() diff --git a/rdiff-backup/testing/user_grouptest.py b/rdiff-backup/testing/user_grouptest.py new file mode 100644 index 0000000..a103ee7 --- /dev/null +++ b/rdiff-backup/testing/user_grouptest.py @@ -0,0 +1,55 @@ +import unittest, pwd, grp, code +from commontest import * +from rdiff_backup import user_group + + +class UserGroupTest(unittest.TestCase): + """Test user and group functionality""" + def test_basic_conversion(self): + """Test basic id2name. May need to modify for different systems""" + user_group.uid2uname_dict = {}; user_group.gid2gname_dict = {} + assert user_group.uid2uname(0) == "root" + assert user_group.uid2uname(0) == "root" + assert user_group.gid2gname(0) == "root" + assert user_group.gid2gname(0) == "root" + + def test_default_mapping(self): + """Test the default user mapping""" + Globals.isdest = 1 + rootid = 0 + binid = pwd.getpwnam('bin')[2] + syncid = pwd.getpwnam('sync')[2] + user_group.init_user_mapping() + assert user_group.UserMap.get_id(0) == 0 + assert user_group.UserMap.get_id(0, 'bin') == binid + assert user_group.UserMap.get_id(binid, 'sync') == syncid + + def test_user_mapping(self): + """Test the user mapping file through the DefinedMap class""" + mapping_string = """ +root:bin +bin:root +500:501 +0:sync +sync:0""" + Globals.isdest = 1 + rootid = 0 + binid = pwd.getpwnam('bin')[2] + syncid = pwd.getpwnam('sync')[2] + daemonid = pwd.getpwnam('daemon')[2] + user_group.init_user_mapping(mapping_string) + + assert user_group.UserMap.get_id(rootid, 'root') == binid + assert user_group.UserMap.get_id(binid, 'bin') == rootid + assert user_group.UserMap.get_id(0) == syncid + assert user_group.UserMap.get_id(syncid, 'sync') == 0 + assert user_group.UserMap.get_id(500) == 501 + + assert user_group.UserMap.get_id(501) == 501 + assert user_group.UserMap.get_id(123, 'daemon') == daemonid + + if 0: code.InteractiveConsole(globals()).interact() + + + +if __name__ == "__main__": unittest.main() -- cgit v1.2.1