summaryrefslogtreecommitdiff
path: root/umessage.py
blob: 3bf92e4973512675dee02a0a9a07644a0a6c70b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""unicode email support"""

import email
from email.Utils import parseaddr, parsedate
from email.Header import decode_header

from mx.DateTime import DateTime

def decode_QP(string):
    parts = []
    for decoded, charset in decode_header(string):
        if charset is None:
            charset = 'iso-8859-15'
        parts.append(unicode(decoded, charset, 'replace'))

    return u' '.join(parts)

def message_from_file(fd):
    try:
        return UMessage(email.message_from_file(fd))
    except email.Errors.MessageParseError:
        return ''
    
class UMessage:
    """Encapsulates an email.Message instance and returns only unicode objects"""

    def __init__(self, message):
        self.message = message

    # email.Message interface #################################################
    
    def get(self, header, default=None):
        value = self.message.get(header, default)
        if value:
            return decode_QP(value)
        return value

    def get_all(self, header, default=None):
        return [decode_QP(val) for val in self.message.get_all(header, default)
                if val is not None]
    
    def get_payload(self, index=None, decode=False):
        message = self.message
        if index is None:
            payload = message.get_payload(index, decode)
            if isinstance(payload, list):
                return [UMessage(msg) for msg in payload]
            if message.get_content_maintype() != 'text':
                return payload
            return unicode(payload or '', message.get_charset() or 'iso-8859-15', 'replace')
        else:
            payload = UMessage(message.get_payload(index, decode))
        return payload

    def is_multipart(self):
        return self.message.is_multipart()

    def get_boundary(self):
        return self.message.get_boundary()

    def walk(self):
        for part in self.message.walk():
            yield UMessage(part)
    
    def get_content_maintype(self):
        return unicode(self.message.get_content_maintype())

    def get_content_type(self):
        return unicode(self.message.get_content_type())

    def get_filename(self, failobj=None):
        value = self.message.get_filename(failobj)
        if value is failobj:
            return value
        try:
            return unicode(value)
        except UnicodeDecodeError:
            return u'error decoding filename'

    # other convenience methods ###############################################

    def headers(self):
        """return an unicode string containing all the message's headers"""
        values = []
        for header in self.message.keys():
            values.append(u'%s: %s' % (header, self.get(header)))
        return '\n'.join(values)

    def multi_addrs(self, header):
        """return a list of 2-uple (name, address) for the given address (which
        is exepected to be an header containing address such as from, to, cc...)
        """
        persons = []
        for person in self.get_all(header, ()):
            name, mail = parseaddr(person)
            persons.append((name, mail))
        return persons
    
    def date(self):
        """return a mx.DateTime object for the email's date or None if no date is
        set or if it can't be parsed
        """
        value = self.get('date')
        if value:
            datetuple = parsedate(value)
            if datetuple:
                return DateTime(*datetuple[:6])
        return None