summaryrefslogtreecommitdiff
path: root/contrib/smimeplus.py
blob: 9475ab029addd40495a6503a20a778c6e1dd6956 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import UserDict
import os
import tempfile

import M2Crypto

from email import Message


class smimeplus(object):
    def __init__(self, cert, privkey, passphrase, cacert, randfile=None):
        self.cipher = 'des_ede3_cbc'   # XXX make it configable??
        self.setsender(cert, privkey, passphrase)
        self.setcacert(cacert)
        self.randfile = randfile
        self.__loadrand()

    def __passcallback(self, v):
        """private key passphrase callback function"""
        return self.passphrase

    def __loadrand(self):
        """Load random number file"""
        if self.randfile:
            M2Crypto.Rand.load_file(self.randfile, -1)

    def __saverand(self):
        """Save random number file"""
        if self.randfile:
            M2Crypto.Rand.save_file(self.randfile)

    def __gettext(self, msg):
        """Return a string representation of 'msg'"""
        _data = ''
        if isinstance(msg, Message.Message):
            for _p in msg.walk():
                _data = _data + _p.as_string()
        else:
            _data = str(msg)
        return _data

    def __pack(self, msg):
        """Convert 'msg' to string and put it into an memory buffer for
           openssl operation"""
        return M2Crypto.BIO.MemoryBuffer(self.__gettext(msg))

    def setsender(self, cert=None, privkey=None, passphrase=None):
        if cert:
            self.cert = cert
        if privkey:
            self.key  = privkey
        if passphrase:
            self.passphrase  = passphrase

    def setcacert(self, cacert):
        self.cacert = cacert

    def sign(self, msg):
        """Sign a message"""
        _sender = M2Crypto.SMIME.SMIME()
        _sender.load_key_bio(self.__pack(self.key), self.__pack(self.cert),
                callback=self.__passcallback)

        _signed = _sender.sign(self.__pack(msg), M2Crypto.SMIME.PKCS7_DETACHED)

        _out = self.__pack(None)
        _sender.write(_out, _signed, self.__pack(msg))
        return _out.read()

    def verify(self, smsg, scert):
        """Verify to see if 'smsg' was signed by 'scert', and scert was
           issued by cacert of this object.  Return message signed if success,
           None otherwise"""
        # Load signer's cert.
        _x509 = M2Crypto.X509.load_cert_bio(self.__pack(scert))
        _stack = M2Crypto.X509.X509_Stack()
        _stack.push(_x509)

        # Load CA cert.
        _tmpfile = persistdata(self.cacert)
        _store = M2Crypto.X509.X509_Store()
        _store.load_info(_tmpfile)
        os.remove(_tmpfile)

        # prepare SMIME object
        _sender = M2Crypto.SMIME.SMIME()
        _sender.set_x509_stack(_stack)
        _sender.set_x509_store(_store)

        # Load signed message, verify it, and return result
        _p7, _data = M2Crypto.SMIME.smime_load_pkcs7_bio(self.__pack(smsg))
        try:
            return _sender.verify(_p7, _data, flags=M2Crypto.SMIME.PKCS7_SIGNED)
        except M2Crypto.SMIME.SMIME_Error:
            return None

    def encrypt(self, rcert, msg):
        # Instantiate an SMIME object.
        _sender = M2Crypto.SMIME.SMIME()

        # Load target cert to encrypt to.
        _x509 = M2Crypto.X509.load_cert_bio(self.__pack(rcert))
        _stack = M2Crypto.X509.X509_Stack()
        _stack.push(_x509)
        _sender.set_x509_stack(_stack)

        _sender.set_cipher(M2Crypto.SMIME.Cipher(self.cipher))

        # Encrypt the buffer.
        _buf = self.__pack(self.__gettext(msg))
        _p7 = _sender.encrypt(_buf)

        # Output p7 in mail-friendly format.
        _out = self.__pack('')
        _sender.write(_out, _p7)

        # Save the PRNG's state.
        self.__saverand()

        return _out.read()

    def decrypt(self, emsg):
        """decrypt 'msg'.  Return decrypt message if success, None
           otherwise"""
        # Load private key and cert.
        _sender = M2Crypto.SMIME.SMIME()
        _sender.load_key_bio(self.__pack(self.key), self.__pack(self.cert),
                callback=self.__passcallback)

        # Load the encrypted data.
        _p7, _data = M2Crypto.SMIME.smime_load_pkcs7_bio(self.__pack(emsg))

        # Decrypt p7.
        try:
            return _sender.decrypt(_p7)
        except M2Crypto.SMIME.SMIME_Error:
            return None

    def addHeader(self, rcert, content, subject=''):
        """Add To, From, Subject Header to 'content'"""
        _scert = M2Crypto.X509.load_cert_bio(self.__pack(self.cert))
        _scertsubj = X509_Subject(str(_scert.get_subject()))
        _rcert = M2Crypto.X509.load_cert_bio(self.__pack(rcert))
        _rcertsubj = X509_Subject(str(_rcert.get_subject()))

        _out = 'From: "%(CN)s" <%(emailAddress)s>\n' % _scertsubj
        _out = _out + 'To: "%(CN)s" <%(emailAddress)s>\n' % _rcertsubj
        _out = _out + 'Subject: %s\n' % subject
        _out = _out + content

        return _out


class X509_Subject(UserDict.UserDict):
    # This class needed to be rewritten or merge with X509_Name
    def __init__(self, substr):
        UserDict.UserDict.__init__(self)
        try:
            _data = substr.strip().split('/')
        except AttributeError:
            pass
        else:
            for _i in _data:
                try:
                    _k, _v = _i.split('=')
                    self[_k] = _v
                except ValueError:
                    pass


def persistdata(data, file=None, isbinary=False):
    if not file:
        file = tempfile.mktemp()
    if isbinary:
        _flag = 'wb'
    else:
        _flag = 'w'

    _fh = open(file, _flag)
    _fh.write(data)
    _fh.close()
    return file