summaryrefslogtreecommitdiff
path: root/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py
blob: 761bf3d960a9735b800cd91becccaace60684ea7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#
# This file is part of pysnmp software.
#
# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
# License: http://snmplabs.com/pysnmp/license.html
#
try:
    from hashlib import md5
except ImportError:
    import md5

    md5 = md5.new
from pyasn1.type import univ
from pysnmp.proto.secmod.rfc3414.auth import base
from pysnmp.proto.secmod.rfc3414 import localkey
from pysnmp.proto import errind, error

_twelveZeros = univ.OctetString((0,) * 12).asOctets()
_fortyEightZeros = (0,) * 48


# rfc3414: 6.2.4

class HmacMd5(base.AbstractAuthenticationService):
    serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 2)  # usmHMACMD5AuthProtocol
    __ipad = [0x36] * 64
    __opad = [0x5C] * 64

    def hashPassphrase(self, authKey):
        return localkey.hashPassphraseMD5(authKey)

    def localizeKey(self, authKey, snmpEngineID):
        return localkey.localizeKeyMD5(authKey, snmpEngineID)

    @property
    def digestLength(self):
        return 12

    # 6.3.1
    def authenticateOutgoingMsg(self, authKey, wholeMsg):
        # Here we expect calling secmod to indicate where the digest
        # should be in the substrate. Also, it pre-sets digest placeholder
        # so we hash wholeMsg out of the box.
        # Yes, that's ugly but that's rfc...
        l = wholeMsg.find(_twelveZeros)
        if l == -1:
            raise error.ProtocolError('Cant locate digest placeholder')
        wholeHead = wholeMsg[:l]
        wholeTail = wholeMsg[l + 12:]

        # 6.3.1.1

        # 6.3.1.2a
        extendedAuthKey = authKey.asNumbers() + _fortyEightZeros

        # 6.3.1.2b --> no-op

        # 6.3.1.2c
        k1 = univ.OctetString(
            map(lambda x, y: x ^ y, extendedAuthKey, self.__ipad)
        )

        # 6.3.1.2d --> no-op

        # 6.3.1.2e
        k2 = univ.OctetString(
            map(lambda x, y: x ^ y, extendedAuthKey, self.__opad)
        )

        # 6.3.1.3
        # noinspection PyDeprecation,PyCallingNonCallable
        d1 = md5(k1.asOctets() + wholeMsg).digest()

        # 6.3.1.4
        # noinspection PyDeprecation,PyCallingNonCallable
        d2 = md5(k2.asOctets() + d1).digest()
        mac = d2[:12]

        # 6.3.1.5 & 6
        return wholeHead + mac + wholeTail

    # 6.3.2
    def authenticateIncomingMsg(self, authKey, authParameters, wholeMsg):
        # 6.3.2.1 & 2
        if len(authParameters) != 12:
            raise error.StatusInformation(
                errorIndication=errind.authenticationError
            )

        # 6.3.2.3
        l = wholeMsg.find(authParameters.asOctets())
        if l == -1:
            raise error.ProtocolError('Cant locate digest in wholeMsg')
        wholeHead = wholeMsg[:l]
        wholeTail = wholeMsg[l + 12:]
        authenticatedWholeMsg = wholeHead + _twelveZeros + wholeTail

        # 6.3.2.4a
        extendedAuthKey = authKey.asNumbers() + _fortyEightZeros

        # 6.3.2.4b --> no-op

        # 6.3.2.4c
        k1 = univ.OctetString(
            map(lambda x, y: x ^ y, extendedAuthKey, self.__ipad)
        )

        # 6.3.2.4d --> no-op

        # 6.3.2.4e
        k2 = univ.OctetString(
            map(lambda x, y: x ^ y, extendedAuthKey, self.__opad)
        )

        # 6.3.2.5a
        # noinspection PyDeprecation,PyCallingNonCallable
        d1 = md5(k1.asOctets() + authenticatedWholeMsg).digest()

        # 6.3.2.5b
        # noinspection PyDeprecation,PyCallingNonCallable
        d2 = md5(k2.asOctets() + d1).digest()

        # 6.3.2.5c
        mac = d2[:12]

        # 6.3.2.6
        if mac != authParameters:
            raise error.StatusInformation(
                errorIndication=errind.authenticationFailure
            )

        return authenticatedWholeMsg