diff options
author | Ted Ross <tross@apache.org> | 2010-02-01 16:10:33 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-02-01 16:10:33 +0000 |
commit | 7f1cc4b0a660cbe837e0261bd576eb4dd51dffd6 (patch) | |
tree | 1ba62254b64dc384d6b431500d4f7b405b9feb79 /cpp/src/tests/federation.py | |
parent | a8d83333c8050c18918e370d2f0bb9621b0038c7 (diff) | |
download | qpid-python-7f1cc4b0a660cbe837e0261bd576eb4dd51dffd6.tar.gz |
QPID-2348 - [C++] The HeadersExchange does not support federation
Applied patch from Sam Joyce
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@905322 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/federation.py')
-rwxr-xr-x | cpp/src/tests/federation.py | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index aa68e8198b..15fa858c68 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -585,6 +585,159 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_dynamic_headers(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers") + + session.exchange_declare(exchange="fed.headers", type="headers") + r_session.exchange_declare(exchange="fed.headers", type="headers") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.message_properties(application_headers={'class':'first'}) + for i in range(1, 11): + r_session.message_transfer(destination="fed.headers", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + content = msg.body + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_reorigin(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_reorigin") + + session.exchange_declare(exchange="fed.headers_reorigin", type="headers") + r_session.exchange_declare(exchange="fed.headers_reorigin", type="headers") + + session.exchange_declare(exchange="fed.headers_reorigin_2", type="headers") + r_session.exchange_declare(exchange="fed.headers_reorigin_2", type="headers") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.headers_reorigin_2", binding_key="key2", arguments={'x-match':'any', 'class':'second'}) + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.headers_reorigin", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.message_properties(application_headers={'class':'first'}) + for i in range(1, 11): + r_session.message_transfer(destination="fed.headers_reorigin", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + + # Extra test: don't explicitly close() bridge2. When the link is closed, + # it should clean up bridge2 automagically. verify_cleanup() will detect + # if bridge2 isn't cleaned up and will fail the test. + # + #result = bridge2.close() + #self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_unbind(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_unbind") + + session.exchange_declare(exchange="fed.headers_unbind", type="headers") + r_session.exchange_declare(exchange="fed.headers_unbind", type="headers") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + queue = qmf.getObjects(_class="queue", name="fed1")[0] + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + session.exchange_bind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + queue.update() + self.assertEqual(queue.bindingCount, 2, + "bindings not accounted for (expected 2, got %d)" % queue.bindingCount) + + session.exchange_unbind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1") + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) |