summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorelie <elie>2012-08-12 06:52:29 +0000
committerelie <elie>2012-08-12 06:52:29 +0000
commit7ccc0a0962d881352f308472d04993b0b9cfb3a9 (patch)
tree03d2f1e694ec8e1716494dcacb798a0d8f664825
parent30362e91d32ae26142cdac9d3836edee70c1a4af (diff)
downloadpysnmp-7ccc0a0962d881352f308472d04993b0b9cfb3a9.tar.gz
auth & target configuration containers moved to separate modules
-rw-r--r--CHANGES2
-rw-r--r--pysnmp/entity/rfc3413/oneliner/auth.py104
-rw-r--r--pysnmp/entity/rfc3413/oneliner/cmdgen.py166
-rw-r--r--pysnmp/entity/rfc3413/oneliner/mibvar.py34
-rw-r--r--pysnmp/entity/rfc3413/oneliner/ntforg.py30
-rw-r--r--pysnmp/entity/rfc3413/oneliner/target.py65
6 files changed, 203 insertions, 198 deletions
diff --git a/CHANGES b/CHANGES
index 96e4eb4..baf41ab 100644
--- a/CHANGES
+++ b/CHANGES
@@ -67,6 +67,8 @@ Revision 4.2.3
impementations. The former just process a single interaction and complete
while the latter run as many interactions as user callback function
indicates to.
+- Auth & target configuration container classes moved to their separate
+ modules at oneliner API.
- Built-in debugger now supports negating debugging categories.
- Fix to CommandGenerator's SNMPv3 engine autodiscovery algorithm
when retryCount is administratively set to 0.
diff --git a/pysnmp/entity/rfc3413/oneliner/auth.py b/pysnmp/entity/rfc3413/oneliner/auth.py
new file mode 100644
index 0000000..ff2c0af
--- /dev/null
+++ b/pysnmp/entity/rfc3413/oneliner/auth.py
@@ -0,0 +1,104 @@
+from pysnmp.entity import config
+from pyasn1.compat.octets import null
+
+class CommunityData:
+ mpModel = 1 # Default is SMIv2
+ securityModel = mpModel + 1
+ securityLevel = 'noAuthNoPriv'
+ contextName = null
+ tag = null
+ def __init__(self, securityName, communityName=None, mpModel=None,
+ contextEngineId=None, contextName=None, tag=None):
+ self.securityName = securityName
+ self.communityName = communityName
+ if mpModel is not None:
+ self.mpModel = mpModel
+ self.securityModel = mpModel + 1
+ self.contextEngineId = contextEngineId
+ if contextName is not None:
+ self.contextName = contextName
+ if tag is not None:
+ self.tag = tag
+ # Autogenerate securityName if not specified
+ if communityName is None:
+ self.communityName = securityName
+ self.securityName = 's%s' % hash(
+ ( securityName, self.mpModel,
+ self.contextEngineId, self.contextName, self.tag )
+ )
+
+ def __repr__(self):
+ return '%s("%s", <COMMUNITY>, %r, %r, %r, %r)' % (
+ self.__class__.__name__,
+ self.securityName,
+ self.mpModel,
+ self.contextEngineId,
+ self.contextName,
+ self.tag
+ )
+
+ def __hash__(self): return hash(self.securityName)
+
+ def __eq__(self, other): return self.securityName == other
+ def __ne__(self, other): return self.securityName != other
+ def __lt__(self, other): return self.securityName < other
+ def __le__(self, other): return self.securityName <= other
+ def __gt__(self, other): return self.securityName > other
+ def __ge__(self, other): return self.securityName >= other
+
+class UsmUserData:
+ authKey = privKey = None
+ authProtocol = config.usmNoAuthProtocol
+ privProtocol = config.usmNoPrivProtocol
+ securityLevel = 'noAuthNoPriv'
+ securityModel = 3
+ mpModel = 3
+ contextName = null
+ def __init__(self, securityName,
+ authKey=None, privKey=None,
+ authProtocol=None, privProtocol=None,
+ contextEngineId=None, contextName=None):
+ self.securityName = securityName
+
+ if authKey is not None:
+ self.authKey = authKey
+ if authProtocol is None:
+ self.authProtocol = config.usmHMACMD5AuthProtocol
+ else:
+ self.authProtocol = authProtocol
+ if self.securityLevel != 'authPriv':
+ self.securityLevel = 'authNoPriv'
+
+ if privKey is not None:
+ self.privKey = privKey
+ if self.authProtocol == config.usmNoAuthProtocol:
+ raise error.PySnmpError('Privacy implies authenticity')
+ self.securityLevel = 'authPriv'
+ if privProtocol is None:
+ self.privProtocol = config.usmDESPrivProtocol
+ else:
+ self.privProtocol = privProtocol
+
+ self.contextEngineId = contextEngineId
+ if contextName is not None:
+ self.contextName = contextName
+
+ def __repr__(self):
+ return '%s("%s", <AUTHKEY>, <PRIVKEY>, %r, %r, %r, %r)' % (
+ self.__class__.__name__,
+ self.securityName,
+ self.authProtocol,
+ self.privProtocol,
+ self.contextEngineId,
+ self.contextName
+ )
+
+ def __hash__(self): return hash(self.securityName)
+
+ def __eq__(self, other): return self.securityName == other
+ def __ne__(self, other): return self.securityName != other
+ def __lt__(self, other): return self.securityName < other
+ def __le__(self, other): return self.securityName <= other
+ def __gt__(self, other): return self.securityName > other
+ def __ge__(self, other): return self.securityName >= other
+
diff --git a/pysnmp/entity/rfc3413/oneliner/cmdgen.py b/pysnmp/entity/rfc3413/oneliner/cmdgen.py
index 4398730..1be2d03 100644
--- a/pysnmp/entity/rfc3413/oneliner/cmdgen.py
+++ b/pysnmp/entity/rfc3413/oneliner/cmdgen.py
@@ -1,8 +1,9 @@
-import socket, sys
from pysnmp.entity import engine, config
from pysnmp.entity.rfc3413 import cmdgen
from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable
-from pysnmp.carrier.asynsock.dgram import udp, udp6, unix
+from pysnmp.entity.rfc3413.oneliner.auth import CommunityData, UsmUserData
+from pysnmp.entity.rfc3413.oneliner.target import UdpTransportTarget, \
+ Udp6TransportTarget, UnixTransportTarget
from pysnmp.proto import rfc1905, errind
from pysnmp.smi import view
from pysnmp import nextid, error
@@ -24,167 +25,6 @@ usmNoPrivProtocol = config.usmNoPrivProtocol
nextID = nextid.Integer(0xffffffff)
-class CommunityData:
- mpModel = 1 # Default is SMIv2
- securityModel = mpModel + 1
- securityLevel = 'noAuthNoPriv'
- contextName = null
- tag = null
- def __init__(self, securityName, communityName=None, mpModel=None,
- contextEngineId=None, contextName=None, tag=None):
- self.securityName = securityName
- self.communityName = communityName
- if mpModel is not None:
- self.mpModel = mpModel
- self.securityModel = mpModel + 1
- self.contextEngineId = contextEngineId
- if contextName is not None:
- self.contextName = contextName
- if tag is not None:
- self.tag = tag
- # Autogenerate securityName if not specified
- if communityName is None:
- self.communityName = securityName
- self.securityName = 's%s' % hash(
- ( securityName, self.mpModel,
- self.contextEngineId, self.contextName, self.tag )
- )
-
- def __repr__(self):
- return '%s("%s", <COMMUNITY>, %r, %r, %r, %r)' % (
- self.__class__.__name__,
- self.securityName,
- self.mpModel,
- self.contextEngineId,
- self.contextName,
- self.tag
- )
-
- def __hash__(self): return hash(self.securityName)
-
- def __eq__(self, other): return self.securityName == other
- def __ne__(self, other): return self.securityName != other
- def __lt__(self, other): return self.securityName < other
- def __le__(self, other): return self.securityName <= other
- def __gt__(self, other): return self.securityName > other
- def __ge__(self, other): return self.securityName >= other
-
-class UsmUserData:
- authKey = privKey = None
- authProtocol = usmNoAuthProtocol
- privProtocol = usmNoPrivProtocol
- securityLevel = 'noAuthNoPriv'
- securityModel = 3
- mpModel = 3
- contextName = null
- def __init__(self, securityName,
- authKey=None, privKey=None,
- authProtocol=None, privProtocol=None,
- contextEngineId=None, contextName=None):
- self.securityName = securityName
-
- if authKey is not None:
- self.authKey = authKey
- if authProtocol is None:
- self.authProtocol = usmHMACMD5AuthProtocol
- else:
- self.authProtocol = authProtocol
- if self.securityLevel != 'authPriv':
- self.securityLevel = 'authNoPriv'
-
- if privKey is not None:
- self.privKey = privKey
- if self.authProtocol == usmNoAuthProtocol:
- raise error.PySnmpError('Privacy implies authenticity')
- self.securityLevel = 'authPriv'
- if privProtocol is None:
- self.privProtocol = usmDESPrivProtocol
- else:
- self.privProtocol = privProtocol
-
- self.contextEngineId = contextEngineId
- if contextName is not None:
- self.contextName = contextName
-
- def __repr__(self):
- return '%s("%s", <AUTHKEY>, <PRIVKEY>, %r, %r, %r, %r)' % (
- self.__class__.__name__,
- self.securityName,
- self.authProtocol,
- self.privProtocol,
- self.contextEngineId,
- self.contextName
- )
-
- def __hash__(self): return hash(self.securityName)
-
- def __eq__(self, other): return self.securityName == other
- def __ne__(self, other): return self.securityName != other
- def __lt__(self, other): return self.securityName < other
- def __le__(self, other): return self.securityName <= other
- def __gt__(self, other): return self.securityName > other
- def __ge__(self, other): return self.securityName >= other
-
-class _AbstractTransportTarget:
- transportDomain = protoTransport = None
- def __init__(self, transportAddr, timeout=1, retries=5, tagList=null):
- self.transportAddr = transportAddr
- self.timeout = timeout
- self.retries = retries
- self.tagList = tagList
-
- def __repr__(self): return '%s(%r, %r, %r, %r)' % (
- self.__class__.__name__, self.transportAddr,
- self.timeout, self.retries, self.tagList
- )
-
- def __hash__(self): return hash(self.transportAddr)
-
- def __eq__(self, other): return self.transportAddr == other
- def __ne__(self, other): return self.transportAddr != other
- def __lt__(self, other): return self.transportAddr < other
- def __le__(self, other): return self.transportAddr <= other
- def __gt__(self, other): return self.transportAddr > other
- def __ge__(self, other): return self.transportAddr >= other
-
- def openClientMode(self):
- self.transport = self.protoTransport().openClientMode()
- return self.transport
-
-class UdpTransportTarget(_AbstractTransportTarget):
- transportDomain = udp.domainName
- protoTransport = udp.UdpSocketTransport
- def __init__(self, transportAddr, timeout=1, retries=5, tagList=null):
- _AbstractTransportTarget.__init__(self, transportAddr, timeout,
- retries, tagList)
- try:
- self.transportAddr = socket.getaddrinfo(transportAddr[0],
- transportAddr[1],
- socket.AF_INET,
- socket.SOCK_DGRAM,
- socket.IPPROTO_UDP)[0][4][:2]
- except socket.gaierror:
- raise error.PySnmpError('Bad IPv4/UDP transport address %s: %s' % ('@'.join([ str(x) for x in transportAddr ]), sys.exc_info()[1]))
-
-class Udp6TransportTarget(_AbstractTransportTarget):
- transportDomain = udp6.domainName
- protoTransport = udp6.Udp6SocketTransport
- def __init__(self, transportAddr, timeout=1, retries=5, tagList=null):
- _AbstractTransportTarget.__init__(self, transportAddr, timeout,
- retries, tagList)
- try:
- self.transportAddr = socket.getaddrinfo(transportAddr[0],
- transportAddr[1],
- socket.AF_INET6,
- socket.SOCK_DGRAM,
- socket.IPPROTO_UDP)[0][4][:2]
- except socket.gaierror:
- raise error.PySnmpError('Bad IPv6/UDP transport address %s: %s' % ('@'.join([ str(x) for x in transportAddr ]), sys.exc_info()[1]))
-
-class UnixTransportTarget(_AbstractTransportTarget):
- transportDomain = unix.domainName
- protoTransport = unix.UnixSocketTransport
-
class AsynCommandGenerator:
_null = univ.Null('')
def __init__(self, snmpEngine=None):
diff --git a/pysnmp/entity/rfc3413/oneliner/mibvar.py b/pysnmp/entity/rfc3413/oneliner/mibvar.py
index 531f011..c813267 100644
--- a/pysnmp/entity/rfc3413/oneliner/mibvar.py
+++ b/pysnmp/entity/rfc3413/oneliner/mibvar.py
@@ -32,7 +32,7 @@ class MibVariable:
if self.__state & (self.stOidOnly | self.stClean):
return self.__oid
else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
def getLabel(self):
if self.__state & self.stClean:
@@ -190,98 +190,98 @@ class MibVariable:
if self.__state & self.stOidOnly:
return str(self.__oid)
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __getitem__(self, i):
if self.__state & self.stOidOnly:
return self.__oid[i]
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __eq__(self, other):
if self.__state & self.stOidOnly:
return self.__oid == other
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __ne__(self, other):
if self.__state & self.stOidOnly:
return self.__oid != other
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __lt__(self, other):
if self.__state & self.stOidOnly:
return self.__oid < other
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __le__(self, other):
if self.__state & self.stOidOnly:
return self.__oid <= other
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __gt__(self, other):
if self.__state & self.stOidOnly:
return self.__oid > other
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __ge__(self, other):
if self.__state & self.stOidOnly:
return self.__oid >= other
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
if sys.version_info[0] <= 2:
def __nonzero__(self):
if self.__state & self.stOidOnly:
return bool(self.__oid)
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
else:
def __bool__(self):
if self.__state & self.stOidOnly:
return bool(self.__oid)
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __hash__(self):
if self.__state & self.stOidOnly:
return hash(self.__oid)
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __len__(self):
if self.__state & self.stOidOnly:
return len(self.__oid)
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def __index__(self, i):
if self.__state & self.stOidOnly:
return self.__oid[i]
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def asTuple(self):
if self.__state & self.stOidOnly:
return self.__oid.asTuple()
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def clone(self, *args):
if self.__state & self.stOidOnly:
return self.__oid.clone(*args)
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def subtype(self, *args):
if self.__state & self.stOidOnly:
return self.__oid.subtype(*args)
else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
+ raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
def isPrefixOf(self, *args):
if self.__state & self.stOidOnly:
diff --git a/pysnmp/entity/rfc3413/oneliner/ntforg.py b/pysnmp/entity/rfc3413/oneliner/ntforg.py
index 3572ac1..fe5f39b 100644
--- a/pysnmp/entity/rfc3413/oneliner/ntforg.py
+++ b/pysnmp/entity/rfc3413/oneliner/ntforg.py
@@ -3,29 +3,23 @@ from pysnmp import nextid
from pysnmp.entity import config
from pysnmp.entity.rfc3413 import ntforg, context
from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable
+from pysnmp.entity.rfc3413.oneliner.auth import CommunityData, UsmUserData
+from pysnmp.entity.rfc3413.oneliner.target import UdpTransportTarget, \
+ Udp6TransportTarget, UnixTransportTarget
from pysnmp.entity.rfc3413.oneliner import cmdgen
# Auth protocol
-usmHMACMD5AuthProtocol = cmdgen.usmHMACMD5AuthProtocol
-usmHMACSHAAuthProtocol = cmdgen.usmHMACSHAAuthProtocol
-usmNoAuthProtocol = cmdgen.usmNoAuthProtocol
+usmHMACMD5AuthProtocol = config.usmHMACMD5AuthProtocol
+usmHMACSHAAuthProtocol = config.usmHMACSHAAuthProtocol
+usmNoAuthProtocol = config.usmNoAuthProtocol
# Privacy protocol
-usmDESPrivProtocol = cmdgen.usmDESPrivProtocol
-usm3DESEDEPrivProtocol = cmdgen.usm3DESEDEPrivProtocol
-usmAesCfb128Protocol = cmdgen.usmAesCfb128Protocol
-usmAesCfb192Protocol = cmdgen.usmAesCfb192Protocol
-usmAesCfb256Protocol = cmdgen.usmAesCfb256Protocol
-usmNoPrivProtocol = cmdgen.usmNoPrivProtocol
-
-# Credentials
-CommunityData = cmdgen.CommunityData
-UsmUserData = cmdgen.UsmUserData
-
-# Transports
-UdpTransportTarget = cmdgen.UdpTransportTarget
-Udp6TransportTarget = cmdgen.Udp6TransportTarget
-UnixTransportTarget = cmdgen.UnixTransportTarget
+usmDESPrivProtocol = config.usmDESPrivProtocol
+usm3DESEDEPrivProtocol = config.usm3DESEDEPrivProtocol
+usmAesCfb128Protocol = config.usmAesCfb128Protocol
+usmAesCfb192Protocol = config.usmAesCfb192Protocol
+usmAesCfb256Protocol = config.usmAesCfb256Protocol
+usmNoPrivProtocol = config.usmNoPrivProtocol
nextID = nextid.Integer(0xffffffff)
diff --git a/pysnmp/entity/rfc3413/oneliner/target.py b/pysnmp/entity/rfc3413/oneliner/target.py
new file mode 100644
index 0000000..a8249a9
--- /dev/null
+++ b/pysnmp/entity/rfc3413/oneliner/target.py
@@ -0,0 +1,65 @@
+import socket, sys
+from pysnmp.carrier.asynsock.dgram import udp, udp6, unix
+from pyasn1.compat.octets import null
+
+class _AbstractTransportTarget:
+ transportDomain = protoTransport = None
+ def __init__(self, transportAddr, timeout=1, retries=5, tagList=null):
+ self.transportAddr = transportAddr
+ self.timeout = timeout
+ self.retries = retries
+ self.tagList = tagList
+
+ def __repr__(self): return '%s(%r, %r, %r, %r)' % (
+ self.__class__.__name__, self.transportAddr,
+ self.timeout, self.retries, self.tagList
+ )
+
+ def __hash__(self): return hash(self.transportAddr)
+
+ def __eq__(self, other): return self.transportAddr == other
+ def __ne__(self, other): return self.transportAddr != other
+ def __lt__(self, other): return self.transportAddr < other
+ def __le__(self, other): return self.transportAddr <= other
+ def __gt__(self, other): return self.transportAddr > other
+ def __ge__(self, other): return self.transportAddr >= other
+
+ def openClientMode(self):
+ self.transport = self.protoTransport().openClientMode()
+ return self.transport
+
+class UdpTransportTarget(_AbstractTransportTarget):
+ transportDomain = udp.domainName
+ protoTransport = udp.UdpSocketTransport
+ def __init__(self, transportAddr, timeout=1, retries=5, tagList=null):
+ _AbstractTransportTarget.__init__(self, transportAddr, timeout,
+ retries, tagList)
+ try:
+ self.transportAddr = socket.getaddrinfo(transportAddr[0],
+ transportAddr[1],
+ socket.AF_INET,
+ socket.SOCK_DGRAM,
+ socket.IPPROTO_UDP)[0][4][:2]
+ except socket.gaierror:
+ raise error.PySnmpError('Bad IPv4/UDP transport address %s: %s' % ('@'.join([ str(x) for x in transportAddr ]), sys.exc_info()[1]))
+
+class Udp6TransportTarget(_AbstractTransportTarget):
+ transportDomain = udp6.domainName
+ protoTransport = udp6.Udp6SocketTransport
+ def __init__(self, transportAddr, timeout=1, retries=5, tagList=null):
+ _AbstractTransportTarget.__init__(self, transportAddr, timeout,
+ retries, tagList)
+ try:
+ self.transportAddr = socket.getaddrinfo(transportAddr[0],
+ transportAddr[1],
+ socket.AF_INET6,
+ socket.SOCK_DGRAM,
+ socket.IPPROTO_UDP)[0][4][:2]
+ except socket.gaierror:
+ raise error.PySnmpError('Bad IPv6/UDP transport address %s: %s' % ('@'.join([ str(x) for x in transportAddr ]), sys.exc_info()[1]))
+
+class UnixTransportTarget(_AbstractTransportTarget):
+ transportDomain = unix.domainName
+ protoTransport = unix.UnixSocketTransport
+
+