diff options
Diffstat (limited to 'cpp/src/tests/federation.py')
-rwxr-xr-x | cpp/src/tests/federation.py | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 8ca78499d3..92a28c01ad 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -802,6 +802,165 @@ class FederationTests(TestBase010): self.verify_cleanup() + + def test_dynamic_headers_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_xml") + + session.exchange_declare(exchange="fed.xml", type="xml") + r_session.exchange_declare(exchange="fed.xml", type="xml") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.xml", binding_key="key1", arguments={'xquery':'true()'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.delivery_properties(routing_key="key1") + for i in range(1, 11): + r_session.message_transfer(destination="fed.xml", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + content = msg.body + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_reorigin_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_reorigin_xml") + + session.exchange_declare(exchange="fed.xml_reorigin", type="xml") + r_session.exchange_declare(exchange="fed.xml_reorigin", type="xml") + + session.exchange_declare(exchange="fed.xml_reorigin_2", type="xml") + r_session.exchange_declare(exchange="fed.xml_reorigin_2", type="xml") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.xml_reorigin_2", binding_key="key2", arguments={'xquery':'true()'}) + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + foo=qmf.getObjects(_class="link") + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.xml_reorigin", binding_key="key1", arguments={'xquery':'true()'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.delivery_properties(routing_key="key1") + for i in range(1, 11): + r_session.message_transfer(destination="fed.xml_reorigin", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + + # Extra test: don't explicitly close() bridge2. When the link is closed, + # it should clean up bridge2 automagically. verify_cleanup() will detect + # if bridge2 isn't cleaned up and will fail the test. + # + #result = bridge2.close() + #self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_unbind_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_xml_unbind") + + session.exchange_declare(exchange="fed.xml_unbind", type="xml") + r_session.exchange_declare(exchange="fed.xml_unbind", type="xml") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + queue = qmf.getObjects(_class="queue", name="fed1")[0] + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + session.exchange_bind(queue="fed1", exchange="fed.xml_unbind", binding_key="key1", arguments={'xquery':'true()'}) + queue.update() + self.assertEqual(queue.bindingCount, 2, + "bindings not accounted for (expected 2, got %d)" % queue.bindingCount) + + session.exchange_unbind(queue="fed1", exchange="fed.xml_unbind", binding_key="key1") + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_topic_nodup(self): """Verify that a message whose routing key matches more than one binding does not get duplicated to the same queue. |