1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
|
import UserDict
import os
import tempfile
import M2Crypto
from email import Message
class smimeplus(object):
def __init__(self, cert, privkey, passphrase, cacert, randfile=None):
self.cipher = 'des_ede3_cbc' # XXX make it configable??
self.setsender(cert, privkey, passphrase)
self.setcacert(cacert)
self.randfile = randfile
self.__loadrand()
def __passcallback(self, v):
"""private key passphrase callback function"""
return self.passphrase
def __loadrand(self):
"""Load random number file"""
if self.randfile:
M2Crypto.Rand.load_file(self.randfile, -1)
def __saverand(self):
"""Save random number file"""
if self.randfile:
M2Crypto.Rand.save_file(self.randfile)
def __gettext(self, msg):
"""Return a string representation of 'msg'"""
_data = ''
if isinstance(msg, Message.Message):
for _p in msg.walk():
_data = _data + _p.as_string()
else:
_data = str(msg)
return _data
def __pack(self, msg):
"""Convert 'msg' to string and put it into an memory buffer for
openssl operation"""
return M2Crypto.BIO.MemoryBuffer(self.__gettext(msg))
def setsender(self, cert=None, privkey=None, passphrase=None):
if cert:
self.cert = cert
if privkey:
self.key = privkey
if passphrase:
self.passphrase = passphrase
def setcacert(self, cacert):
self.cacert = cacert
def sign(self, msg):
"""Sign a message"""
_sender = M2Crypto.SMIME.SMIME()
_sender.load_key_bio(self.__pack(self.key), self.__pack(self.cert),
callback=self.__passcallback)
_signed = _sender.sign(self.__pack(msg), M2Crypto.SMIME.PKCS7_DETACHED)
_out = self.__pack(None)
_sender.write(_out, _signed, self.__pack(msg))
return _out.read()
def verify(self, smsg, scert):
"""Verify to see if 'smsg' was signed by 'scert', and scert was
issued by cacert of this object. Return message signed if success,
None otherwise"""
# Load signer's cert.
_x509 = M2Crypto.X509.load_cert_bio(self.__pack(scert))
_stack = M2Crypto.X509.X509_Stack()
_stack.push(_x509)
# Load CA cert.
_tmpfile = persistdata(self.cacert)
_store = M2Crypto.X509.X509_Store()
_store.load_info(_tmpfile)
os.remove(_tmpfile)
# prepare SMIME object
_sender = M2Crypto.SMIME.SMIME()
_sender.set_x509_stack(_stack)
_sender.set_x509_store(_store)
# Load signed message, verify it, and return result
_p7, _data = M2Crypto.SMIME.smime_load_pkcs7_bio(self.__pack(smsg))
try:
return _sender.verify(_p7, _data, flags=M2Crypto.SMIME.PKCS7_SIGNED)
except M2Crypto.SMIME.SMIME_Error:
return None
def encrypt(self, rcert, msg):
# Instantiate an SMIME object.
_sender = M2Crypto.SMIME.SMIME()
# Load target cert to encrypt to.
_x509 = M2Crypto.X509.load_cert_bio(self.__pack(rcert))
_stack = M2Crypto.X509.X509_Stack()
_stack.push(_x509)
_sender.set_x509_stack(_stack)
_sender.set_cipher(M2Crypto.SMIME.Cipher(self.cipher))
# Encrypt the buffer.
_buf = self.__pack(self.__gettext(msg))
_p7 = _sender.encrypt(_buf)
# Output p7 in mail-friendly format.
_out = self.__pack('')
_sender.write(_out, _p7)
# Save the PRNG's state.
self.__saverand()
return _out.read()
def decrypt(self, emsg):
"""decrypt 'msg'. Return decrypt message if success, None
otherwise"""
# Load private key and cert.
_sender = M2Crypto.SMIME.SMIME()
_sender.load_key_bio(self.__pack(self.key), self.__pack(self.cert),
callback=self.__passcallback)
# Load the encrypted data.
_p7, _data = M2Crypto.SMIME.smime_load_pkcs7_bio(self.__pack(emsg))
# Decrypt p7.
try:
return _sender.decrypt(_p7)
except M2Crypto.SMIME.SMIME_Error:
return None
def addHeader(self, rcert, content, subject=''):
"""Add To, From, Subject Header to 'content'"""
_scert = M2Crypto.X509.load_cert_bio(self.__pack(self.cert))
_scertsubj = X509_Subject(str(_scert.get_subject()))
_rcert = M2Crypto.X509.load_cert_bio(self.__pack(rcert))
_rcertsubj = X509_Subject(str(_rcert.get_subject()))
_out = 'From: "%(CN)s" <%(emailAddress)s>\n' % _scertsubj
_out = _out + 'To: "%(CN)s" <%(emailAddress)s>\n' % _rcertsubj
_out = _out + 'Subject: %s\n' % subject
_out = _out + content
return _out
class X509_Subject(UserDict.UserDict):
# This class needed to be rewritten or merge with X509_Name
def __init__(self, substr):
UserDict.UserDict.__init__(self)
try:
_data = substr.strip().split('/')
except AttributeError:
pass
else:
for _i in _data:
try:
_k, _v = _i.split('=')
self[_k] = _v
except ValueError:
pass
def persistdata(data, file=None, isbinary=False):
if not file:
file = tempfile.mktemp()
if isbinary:
_flag = 'wb'
else:
_flag = 'w'
_fh = open(file, _flag)
_fh.write(data)
_fh.close()
return file
|