summaryrefslogtreecommitdiff
path: root/pysnmp/entity/observer.py
blob: ddc959c362031fbe349b055e468ef64b534b8ec3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#
# This file is part of pysnmp software.
#
# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
# License: http://snmplabs.com/pysnmp/license.html
#
from pysnmp import error


class MetaObserver(object):
    """This is a simple facility for exposing internal SNMP Engine
       working details to pysnmp applications. These details are
       basically local scope variables at a fixed point of execution.

       Two modes of operations are offered:
       1. Consumer: app can request an execution point context by execution point ID.
       2. Provider: app can register its callback function (and context) to be invoked
          once execution reaches specified point. All local scope variables
          will be passed to the callback as in #1.

       It's important to realize that execution context is only guaranteed
       to exist to functions that are at the same or deeper level of invocation
       relative to execution point specified.
    """

    def __init__(self):
        self.__observers = {}
        self.__contexts = {}
        self.__execpoints = {}

    def registerObserver(self, cbFun, *execpoints, **kwargs):
        if cbFun in self.__contexts:
            raise error.PySnmpError('duplicate observer %s' % cbFun)

        else:
            self.__contexts[cbFun] = kwargs.get('cbCtx')

        for execpoint in execpoints:
            if execpoint not in self.__observers:
                self.__observers[execpoint] = []

            self.__observers[execpoint].append(cbFun)

    def unregisterObserver(self, cbFun=None):
        if cbFun is None:
            self.__observers.clear()
            self.__contexts.clear()

        else:
            for execpoint in dict(self.__observers):
                if cbFun in self.__observers[execpoint]:
                    self.__observers[execpoint].remove(cbFun)

                if not self.__observers[execpoint]:
                    del self.__observers[execpoint]

    def storeExecutionContext(self, snmpEngine, execpoint, variables):
        self.__execpoints[execpoint] = variables
        if execpoint in self.__observers:
            for cbFun in self.__observers[execpoint]:
                cbFun(snmpEngine, execpoint, variables, self.__contexts[cbFun])

    def clearExecutionContext(self, snmpEngine, *execpoints):
        if execpoints:
            for execpoint in execpoints:
                del self.__execpoints[execpoint]

        else:
            self.__execpoints.clear()

    def getExecutionContext(self, execpoint):
        return self.__execpoints[execpoint]