1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
import unittest, test_appliance
import _yaml, yaml
class TestCVersion(unittest.TestCase):
def testCVersion(self):
self.failUnlessEqual("%s.%s.%s" % _yaml.get_version(), _yaml.get_version_string())
class TestCLoader(test_appliance.TestAppliance):
def _testCScannerFileInput(self, test_name, data_filename, canonical_filename):
self._testCScanner(test_name, data_filename, canonical_filename, True)
def _testCScanner(self, test_name, data_filename, canonical_filename, file_input=False, Loader=yaml.Loader):
if file_input:
data = file(data_filename, 'r')
else:
data = file(data_filename, 'r').read()
tokens = list(yaml.scan(data, Loader=Loader))
ext_tokens = []
try:
if file_input:
data = file(data_filename, 'r')
for token in yaml.scan(data, Loader=yaml.CLoader):
ext_tokens.append(token)
self.failUnlessEqual(len(tokens), len(ext_tokens))
for token, ext_token in zip(tokens, ext_tokens):
self.failUnlessEqual(token.__class__, ext_token.__class__)
self.failUnlessEqual((token.start_mark.index, token.start_mark.line, token.start_mark.column),
(ext_token.start_mark.index, ext_token.start_mark.line, ext_token.start_mark.column))
self.failUnlessEqual((token.end_mark.index, token.end_mark.line, token.end_mark.column),
(ext_token.end_mark.index, ext_token.end_mark.line, ext_token.end_mark.column))
if hasattr(token, 'value'):
self.failUnlessEqual(token.value, ext_token.value)
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "TOKENS:", tokens
print "EXT_TOKENS:", ext_tokens
raise
def _testCParser(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader):
data = file(data_filename, 'r').read()
events = list(yaml.parse(data, Loader=Loader))
ext_events = []
try:
for event in yaml.parse(data, Loader=yaml.CLoader):
ext_events.append(event)
#print "EVENT:", event
self.failUnlessEqual(len(events), len(ext_events))
for event, ext_event in zip(events, ext_events):
self.failUnlessEqual(event.__class__, ext_event.__class__)
if hasattr(event, 'anchor'):
self.failUnlessEqual(event.anchor, ext_event.anchor)
if hasattr(event, 'tag'):
self.failUnlessEqual(event.tag, ext_event.tag)
if hasattr(event, 'implicit'):
self.failUnlessEqual(event.implicit, ext_event.implicit)
if hasattr(event, 'value'):
self.failUnlessEqual(event.value, ext_event.value)
if hasattr(event, 'explicit'):
self.failUnlessEqual(event.explicit, ext_event.explicit)
if hasattr(event, 'version'):
self.failUnlessEqual(event.version, ext_event.version)
if hasattr(event, 'tags'):
self.failUnlessEqual(event.tags, ext_event.tags)
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "EVENTS:", events
print "EXT_EVENTS:", ext_events
raise
TestCLoader.add_tests('testCScanner', '.data', '.canonical')
TestCLoader.add_tests('testCScannerFileInput', '.data', '.canonical')
TestCLoader.add_tests('testCParser', '.data', '.canonical')
class TestCEmitter(test_appliance.TestAppliance):
def _testCEmitter(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader):
data1 = file(data_filename, 'r').read()
events = list(yaml.parse(data1, Loader=Loader))
data2 = yaml.emit(events, Dumper=yaml.CDumper)
ext_events = []
try:
for event in yaml.parse(data2):
ext_events.append(event)
self.failUnlessEqual(len(events), len(ext_events))
for event, ext_event in zip(events, ext_events):
self.failUnlessEqual(event.__class__, ext_event.__class__)
if hasattr(event, 'anchor'):
self.failUnlessEqual(event.anchor, ext_event.anchor)
if hasattr(event, 'tag'):
if not (event.tag in ['!', None] and ext_event.tag in ['!', None]):
self.failUnlessEqual(event.tag, ext_event.tag)
if hasattr(event, 'implicit'):
self.failUnlessEqual(event.implicit, ext_event.implicit)
if hasattr(event, 'value'):
self.failUnlessEqual(event.value, ext_event.value)
if hasattr(event, 'explicit'):
self.failUnlessEqual(event.explicit, ext_event.explicit)
if hasattr(event, 'version'):
self.failUnlessEqual(event.version, ext_event.version)
if hasattr(event, 'tags'):
self.failUnlessEqual(event.tags, ext_event.tags)
except:
print
print "DATA1:"
print data1
print "DATA2:"
print data2
print "EVENTS:", events
print "EXT_EVENTS:", ext_events
raise
TestCEmitter.add_tests('testCEmitter', '.data', '.canonical')
yaml.BaseLoader = yaml.CBaseLoader
yaml.SafeLoader = yaml.CSafeLoader
yaml.Loader = yaml.CLoader
yaml.BaseDumper = yaml.CBaseDumper
yaml.SafeDumper = yaml.CSafeDumper
yaml.Dumper = yaml.CDumper
old_scan = yaml.scan
def scan(stream, Loader=yaml.CLoader):
return old_scan(stream, Loader)
yaml.scan = scan
old_parse = yaml.parse
def parse(stream, Loader=yaml.CLoader):
return old_parse(stream, Loader)
yaml.parse = parse
old_compose = yaml.compose
def compose(stream, Loader=yaml.CLoader):
return old_compose(stream, Loader)
yaml.compose = compose
old_compose_all = yaml.compose_all
def compose_all(stream, Loader=yaml.CLoader):
return old_compose_all(stream, Loader)
yaml.compose_all = compose_all
old_load_all = yaml.load_all
def load_all(stream, Loader=yaml.CLoader):
return old_load_all(stream, Loader)
yaml.load_all = load_all
old_load = yaml.load
def load(stream, Loader=yaml.CLoader):
return old_load(stream, Loader)
yaml.load = load
def safe_load_all(stream):
return yaml.load_all(stream, yaml.CSafeLoader)
yaml.safe_load_all = safe_load_all
def safe_load(stream):
return yaml.load(stream, yaml.CSafeLoader)
yaml.safe_load = safe_load
old_emit = yaml.emit
def emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
return old_emit(events, stream, Dumper, **kwds)
yaml.emit = emit
old_serialize_all = yaml.serialize_all
def serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
return old_serialize_all(nodes, stream, Dumper, **kwds)
yaml.serialize_all = serialize_all
old_serialize = yaml.serialize
def serialize(node, stream, Dumper=yaml.CDumper, **kwds):
return old_serialize(node, stream, Dumper, **kwds)
yaml.serialize = serialize
old_dump_all = yaml.dump_all
def dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
return old_dump_all(documents, stream, Dumper, **kwds)
yaml.dump_all = dump_all
old_dump = yaml.dump
def dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
return old_dump(data, stream, Dumper, **kwds)
yaml.dump = dump
def safe_dump_all(documents, stream=None, **kwds):
return yaml.dump_all(documents, stream, yaml.CSafeDumper, **kwds)
yaml.safe_dump_all = safe_dump_all
def safe_dump(data, stream=None, **kwds):
return yaml.dump(data, stream, yaml.CSafeDumper, **kwds)
yaml.safe_dump = safe_dump
from test_yaml import *
def main(module='__main__'):
unittest.main(module)
if __name__ == '__main__':
main()
|