summaryrefslogtreecommitdiff
path: root/rdflib/plugins/serializers/xmlwriter.py
blob: 9ed10f48fc7d71318f09c7baf61e555d2af32655 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import codecs
from xml.sax.saxutils import escape, quoteattr

__all__ = ["XMLWriter"]

ESCAPE_ENTITIES = {"\r": "
"}


class XMLWriter(object):
    def __init__(self, stream, namespace_manager, encoding=None, decl=1, extra_ns=None):
        encoding = encoding or "utf-8"
        encoder, decoder, stream_reader, stream_writer = codecs.lookup(encoding)
        self.stream = stream = stream_writer(stream)
        if decl:
            stream.write('<?xml version="1.0" encoding="%s"?>' % encoding)
        self.element_stack = []
        self.nm = namespace_manager
        self.extra_ns = extra_ns or {}
        self.closed = True

    def __get_indent(self):
        return "  " * len(self.element_stack)

    indent = property(__get_indent)

    def __close_start_tag(self):
        if not self.closed:  # TODO:
            self.closed = True
            self.stream.write(">")

    def push(self, uri):
        self.__close_start_tag()
        write = self.stream.write
        write("\n")
        write(self.indent)
        write("<%s" % self.qname(uri))
        self.element_stack.append(uri)
        self.closed = False
        self.parent = False

    def pop(self, uri=None):
        top = self.element_stack.pop()
        if uri:
            assert uri == top
        write = self.stream.write
        if not self.closed:
            self.closed = True
            write("/>")
        else:
            if self.parent:
                write("\n")
                write(self.indent)
            write("</%s>" % self.qname(top))
        self.parent = True

    def element(self, uri, content, attributes={}):
        """Utility method for adding a complete simple element"""
        self.push(uri)
        for k, v in attributes.items():
            self.attribute(k, v)
        self.text(content)
        self.pop()

    def namespaces(self, namespaces=None):
        if not namespaces:
            namespaces = self.nm.namespaces()

        write = self.stream.write
        write("\n")
        for prefix, namespace in namespaces:
            if prefix:
                write('  xmlns:%s="%s"\n' % (prefix, namespace))
            # Allow user-provided namespace bindings to prevail
            elif prefix not in self.extra_ns:
                write('  xmlns="%s"\n' % namespace)

        for prefix, namespace in self.extra_ns.items():
            if prefix:
                write('  xmlns:%s="%s"\n' % (prefix, namespace))
            else:
                write('  xmlns="%s"\n' % namespace)

    def attribute(self, uri, value):
        write = self.stream.write
        write(" %s=%s" % (self.qname(uri), quoteattr(value)))

    def text(self, text):
        self.__close_start_tag()
        if "<" in text and ">" in text and "]]>" not in text:
            self.stream.write("<![CDATA[")
            self.stream.write(text)
            self.stream.write("]]>")
        else:
            self.stream.write(escape(text, ESCAPE_ENTITIES))

    def qname(self, uri):
        """Compute qname for a uri using our extra namespaces,
        or the given namespace manager"""

        for pre, ns in self.extra_ns.items():
            if uri.startswith(ns):
                if pre != "":
                    return ":".join([pre, uri[len(ns) :]])
                else:
                    return uri[len(ns) :]

        return self.nm.qname_strict(uri)