diff options
Diffstat (limited to 'python/netlink/route/address.py')
-rw-r--r-- | python/netlink/route/address.py | 398 |
1 files changed, 398 insertions, 0 deletions
diff --git a/python/netlink/route/address.py b/python/netlink/route/address.py new file mode 100644 index 0000000..c0ce54f --- /dev/null +++ b/python/netlink/route/address.py @@ -0,0 +1,398 @@ +# +# Copyright (c) 2011 Thomas Graf <tgraf@suug.ch> +# + +"""Module providing access to network addresses +""" + +__version__ = "1.0" +__all__ = [ + 'AddressCache', + 'Address'] + +import datetime +import netlink.core as netlink +import netlink.capi as core_capi +import netlink.route.capi as capi +import netlink.route.link as Link +import netlink.util as util + +########################################################################### +# Address Cache +class AddressCache(netlink.Cache): + """Cache containing network addresses""" + + def __init__(self, cache=None): + if not cache: + cache = self._alloc_cache_name("route/addr") + + self._protocol = netlink.NETLINK_ROUTE + self._c_cache = cache + + def __getitem__(self, key): + # Using ifindex=0 here implies that the local address itself + # is unique, otherwise the first occurence is returned. + return self.lookup(0, key) + + def lookup(self, ifindex, local): + if type(local) is str: + local = netlink.AbstractAddress(local) + + addr = capi.rtnl_addr_get(self._c_cache, ifindex, + local._nl_addr) + if addr is None: + raise KeyError() + + return Address._from_capi(addr) + + def _new_object(self, obj): + return Address(obj) + + def _new_cache(self, cache): + return AddressCache(cache=cache) + +########################################################################### +# Address Object +class Address(netlink.Object): + """Network address""" + + def __init__(self, obj=None): + netlink.Object.__init__(self, "route/addr", "address", obj) + self._rtnl_addr = self._obj2type(self._nl_object) + + @classmethod + def _from_capi(cls, obj): + return cls(capi.addr2obj(obj)) + + def _obj2type(self, obj): + return capi.obj2addr(obj) + + def __cmp__(self, other): + # sort by: + # 1. network link + # 2. address family + # 3. local address (including prefixlen) + diff = self.ifindex - other.ifindex + + if diff == 0: + diff = self.family - other.family + if diff == 0: + diff = capi.nl_addr_cmp(self.local, other.local) + + return diff + + def _new_instance(self, obj): + return Address(obj) + + ##################################################################### + # ifindex + @netlink.nlattr('address.ifindex', type=int, immutable=True, + fmt=util.num) + @property + def ifindex(self): + """interface index""" + return capi.rtnl_addr_get_ifindex(self._rtnl_addr) + + @ifindex.setter + def ifindex(self, value): + link = Link.resolve(value) + if not link: + raise ValueError() + + self.link = link + + ##################################################################### + # link + @netlink.nlattr('address.link', type=str, fmt=util.string) + @property + def link(self): + link = capi.rtnl_addr_get_link(self._rtnl_addr) + if not link: + return None + + return Link.Link.from_capi(link) + + @link.setter + def link(self, value): + if type(value) is str: + try: + value = Link.resolve(value) + except KeyError: + raise ValueError() + + capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link) + + # ifindex is immutable but we assume that if _orig does not + # have an ifindex specified, it was meant to be given here + if capi.rtnl_addr_get_ifindex(self._orig) == 0: + capi.rtnl_addr_set_ifindex(self._orig, value.ifindex) + + ##################################################################### + # label + @netlink.nlattr('address.label', type=str, fmt=util.string) + @property + def label(self): + """address label""" + return capi.rtnl_addr_get_label(self._rtnl_addr) + + @label.setter + def label(self, value): + capi.rtnl_addr_set_label(self._rtnl_addr, value) + + ##################################################################### + # flags + @netlink.nlattr('address.flags', type=str, fmt=util.string) + @property + def flags(self): + """Flags""" + flags = capi.rtnl_addr_get_flags(self._rtnl_addr) + return capi.rtnl_addr_flags2str(flags, 256)[0].split(',') + + def _set_flag(self, flag): + if flag[0] == '-': + i = capi.rtnl_addr_str2flags(flag[1:]) + capi.rtnl_addr_unset_flags(self._rtnl_addr, i) + else: + i = capi.rtnl_addr_str2flags(flag[1:]) + capi.rtnl_addr_set_flags(self._rtnl_addr, i) + + @flags.setter + def flags(self, value): + if type(value) is list: + for flag in value: + self._set_flag(flag) + else: + self._set_flag(value) + + ##################################################################### + # family + @netlink.nlattr('address.family', type=int, immutable=True, + fmt=util.num) + @property + def family(self): + """Address family""" + fam = capi.rtnl_addr_get_family(self._rtnl_addr) + return netlink.AddressFamily(fam) + + @family.setter + def family(self, value): + if not isinstance(value, AddressFamily): + value = AddressFamily(value) + + capi.rtnl_addr_set_family(self._rtnl_addr, int(value)) + + ##################################################################### + # scope + @netlink.nlattr('address.scope', type=int, fmt=util.num) + @property + def scope(self): + """Address scope""" + scope = capi.rtnl_addr_get_scope(self._rtnl_addr) + return capi.rtnl_scope2str(scope, 32)[0] + + @scope.setter + def scope(self, value): + if type(value) is str: + value = capi.rtnl_str2scope(value) + capi.rtnl_addr_set_scope(self._rtnl_addr, value) + + ##################################################################### + # local address + @netlink.nlattr('address.local', type=str, immutable=True, + fmt=util.addr) + @property + def local(self): + """Local address""" + a = capi.rtnl_addr_get_local(self._rtnl_addr) + return netlink.AbstractAddress(a) + + @local.setter + def local(self, value): + a = netlink.AbstractAddress(value) + capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr) + + # local is immutable but we assume that if _orig does not + # have a local address specified, it was meant to be given here + if capi.rtnl_addr_get_local(self._orig) is None: + capi.rtnl_addr_set_local(self._orig, a._nl_addr) + + ##################################################################### + # Peer address + @netlink.nlattr('address.peer', type=str, fmt=util.addr) + @property + def peer(self): + """Peer address""" + a = capi.rtnl_addr_get_peer(self._rtnl_addr) + return netlink.AbstractAddress(a) + + @peer.setter + def peer(self, value): + a = netlink.AbstractAddress(value) + capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr) + + ##################################################################### + # Broadcast address + @netlink.nlattr('address.broadcast', type=str, fmt=util.addr) + @property + def broadcast(self): + """Broadcast address""" + a = capi.rtnl_addr_get_broadcast(self._rtnl_addr) + return netlink.AbstractAddress(a) + + @broadcast.setter + def broadcast(self, value): + a = netlink.AbstractAddress(value) + capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr) + + ##################################################################### + # Multicast address + @netlink.nlattr('address.multicast', type=str, fmt=util.addr) + @property + def multicast(self): + """multicast address""" + a = capi.rtnl_addr_get_multicast(self._rtnl_addr) + return netlink.AbstractAddress(a) + + @multicast.setter + def multicast(self, value): + try: + a = netlink.AbstractAddress(value) + except ValueError as err: + raise AttributeError('multicast', err) + + capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr) + + ##################################################################### + # Anycast address + @netlink.nlattr('address.anycast', type=str, fmt=util.addr) + @property + def anycast(self): + """anycast address""" + a = capi.rtnl_addr_get_anycast(self._rtnl_addr) + return netlink.AbstractAddress(a) + + @anycast.setter + def anycast(self, value): + a = netlink.AbstractAddress(value) + capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr) + + ##################################################################### + # Valid lifetime + @netlink.nlattr('address.valid_lifetime', type=int, immutable=True, + fmt=util.num) + @property + def valid_lifetime(self): + """Valid lifetime""" + msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr) + if msecs == 0xFFFFFFFF: + return None + else: + return datetime.timedelta(seconds=msecs) + + @valid_lifetime.setter + def valid_lifetime(self, value): + capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value)) + + ##################################################################### + # Preferred lifetime + @netlink.nlattr('address.preferred_lifetime', type=int, + immutable=True, fmt=util.num) + @property + def preferred_lifetime(self): + """Preferred lifetime""" + msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr) + if msecs == 0xFFFFFFFF: + return None + else: + return datetime.timedelta(seconds=msecs) + + @preferred_lifetime.setter + def preferred_lifetime(self, value): + capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value)) + + ##################################################################### + # Creation Time + @netlink.nlattr('address.create_time', type=int, immutable=True, + fmt=util.num) + @property + def create_time(self): + """Creation time""" + hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr) + return datetime.timedelta(milliseconds=10*hsec) + + ##################################################################### + # Last Update + @netlink.nlattr('address.last_update', type=int, immutable=True, + fmt=util.num) + @property + def last_update(self): + """Last update""" + hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr) + return datetime.timedelta(milliseconds=10*hsec) + + ##################################################################### + # add() + def add(self, socket=None, flags=None): + if not socket: + socket = netlink.lookup_socket(netlink.NETLINK_ROUTE) + + if not flags: + flags = netlink.NLM_F_CREATE + + ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags) + if ret < 0: + raise netlink.KernelError(ret) + + ##################################################################### + # delete() + def delete(self, socket): + """Attempt to delete this link in the kernel""" + ret = capi.rtnl_addr_delete(socket._sock, self._addr) + if ret < 0: + raise netlink.KernelError(ret) + + ################################################################### + # private properties + # + # Used for formatting output. USE AT OWN RISK + @property + def _flags(self): + return ','.join(self.flags) + + ################################################################### + # + # format(details=False, stats=False) + # + def format(self, details=False, stats=False, nodev=False, indent=''): + """Return address as formatted text""" + fmt = util.MyFormatter(self, indent) + + buf = fmt.format('{a|local!b}') + + if not nodev: + buf += fmt.format(' {a|ifindex}') + + buf += fmt.format(' {a|scope}') + + if self.label: + buf += fmt.format(' "{a|label}"') + + buf += fmt.format(' <{a|_flags}>') + + if details: + buf += fmt.nl('\t{t|broadcast} {t|multicast}') \ + + fmt.nl('\t{t|peer} {t|anycast}') + + if self.valid_lifetime: + buf += fmt.nl('\t{s|valid-lifetime!k} '\ + '{a|valid_lifetime}') + + if self.preferred_lifetime: + buf += fmt.nl('\t{s|preferred-lifetime!k} '\ + '{a|preferred_lifetime}') + + if stats and (self.create_time or self.last_update): + buf += self.nl('\t{s|created!k} {a|create_time}'\ + ' {s|last-updated!k} {a|last_update}') + + return buf |