summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-02-01 16:10:33 +0000
committerTed Ross <tross@apache.org>2010-02-01 16:10:33 +0000
commit7f1cc4b0a660cbe837e0261bd576eb4dd51dffd6 (patch)
tree1ba62254b64dc384d6b431500d4f7b405b9feb79 /cpp/src/tests
parenta8d83333c8050c18918e370d2f0bb9621b0038c7 (diff)
downloadqpid-python-7f1cc4b0a660cbe837e0261bd576eb4dd51dffd6.tar.gz
QPID-2348 - [C++] The HeadersExchange does not support federation
Applied patch from Sam Joyce git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@905322 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rwxr-xr-xcpp/src/tests/federation.py153
-rw-r--r--cpp/src/tests/headers_federation.py99
-rw-r--r--cpp/src/tests/run_headers_federation_tests49
3 files changed, 301 insertions, 0 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index aa68e8198b..15fa858c68 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -585,6 +585,159 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_dynamic_headers(self):
+ session = self.session
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_dynamic_headers")
+
+ session.exchange_declare(exchange="fed.headers", type="headers")
+ r_session.exchange_declare(exchange="fed.headers", type="headers")
+
+ self.startQmf()
+ qmf = self.qmf
+
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ props = r_session.message_properties(application_headers={'class':'first'})
+ for i in range(1, 11):
+ r_session.message_transfer(destination="fed.headers", message=Message(props, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ content = msg.body
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ self.verify_cleanup()
+
+ def test_dynamic_headers_reorigin(self):
+ session = self.session
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_dynamic_headers_reorigin")
+
+ session.exchange_declare(exchange="fed.headers_reorigin", type="headers")
+ r_session.exchange_declare(exchange="fed.headers_reorigin", type="headers")
+
+ session.exchange_declare(exchange="fed.headers_reorigin_2", type="headers")
+ r_session.exchange_declare(exchange="fed.headers_reorigin_2", type="headers")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ session.queue_declare(queue="fed2", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed2", exchange="fed.headers_reorigin_2", binding_key="key2", arguments={'x-match':'any', 'class':'second'})
+ self.subscribe(queue="fed2", destination="f2")
+ queue2 = session.incoming("f2")
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+ result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+
+ bridge = qmf.getObjects(_class="bridge")[0]
+ bridge2 = qmf.getObjects(_class="bridge")[1]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.headers_reorigin", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ props = r_session.message_properties(application_headers={'class':'first'})
+ for i in range(1, 11):
+ r_session.message_transfer(destination="fed.headers_reorigin", message=Message(props, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+
+ # Extra test: don't explicitly close() bridge2. When the link is closed,
+ # it should clean up bridge2 automagically. verify_cleanup() will detect
+ # if bridge2 isn't cleaned up and will fail the test.
+ #
+ #result = bridge2.close()
+ #self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ self.verify_cleanup()
+
+ def test_dynamic_headers_unbind(self):
+ session = self.session
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_dynamic_headers_unbind")
+
+ session.exchange_declare(exchange="fed.headers_unbind", type="headers")
+ r_session.exchange_declare(exchange="fed.headers_unbind", type="headers")
+
+ self.startQmf()
+ qmf = self.qmf
+
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ queue = qmf.getObjects(_class="queue", name="fed1")[0]
+ queue.update()
+ self.assertEqual(queue.bindingCount, 1,
+ "bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
+ session.exchange_bind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
+ queue.update()
+ self.assertEqual(queue.bindingCount, 2,
+ "bindings not accounted for (expected 2, got %d)" % queue.bindingCount)
+
+ session.exchange_unbind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1")
+ queue.update()
+ self.assertEqual(queue.bindingCount, 1,
+ "bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ self.verify_cleanup()
+
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
diff --git a/cpp/src/tests/headers_federation.py b/cpp/src/tests/headers_federation.py
new file mode 100644
index 0000000000..60cff1da54
--- /dev/null
+++ b/cpp/src/tests/headers_federation.py
@@ -0,0 +1,99 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
+from qpid.queue import Empty
+from time import sleep
+
+class HeadersFederationTests(TestBase010):
+
+ def remote_host(self):
+ return self.defines.get("remote-host", "localhost")
+
+ def remote_port(self):
+ return int(self.defines["remote-port"])
+
+ def verify_cleanup(self):
+ attempts = 0
+ total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link"))
+ while total > 0:
+ attempts += 1
+ if attempts >= 10:
+ self.fail("Bridges and links didn't clean up")
+ return
+ sleep(1)
+ total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link"))
+
+ def test_dynamic_headers_unbind(self):
+ session = self.session
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_dynamic_headers_unbind")
+
+ session.exchange_declare(exchange="fed.headers_unbind", type="headers")
+ r_session.exchange_declare(exchange="fed.headers_unbind", type="headers")
+
+ self.startQmf()
+ qmf = self.qmf
+
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ queue = qmf.getObjects(_class="queue", name="fed1")[0]
+ queue.update()
+ self.assertEqual(queue.bindingCount, 1,
+ "bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
+ session.exchange_bind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
+ queue.update()
+ self.assertEqual(queue.bindingCount, 2,
+ "bindings not accounted for (expected 2, got %d)" % queue.bindingCount)
+
+ session.exchange_unbind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1")
+ queue.update()
+ self.assertEqual(queue.bindingCount, 1,
+ "bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ self.verify_cleanup()
+
+ def getProperty(self, msg, name):
+ for h in msg.headers:
+ if hasattr(h, name): return getattr(h, name)
+ return None
+
+ def getAppHeader(self, msg, name):
+ headers = self.getProperty(msg, "application_headers")
+ if headers:
+ return headers[name]
+ return None
diff --git a/cpp/src/tests/run_headers_federation_tests b/cpp/src/tests/run_headers_federation_tests
new file mode 100644
index 0000000000..a4584e6884
--- /dev/null
+++ b/cpp/src/tests/run_headers_federation_tests
@@ -0,0 +1,49 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run the federation tests for the Headers Exchange.
+
+source ./test_env.sh
+
+trap stop_brokers INT TERM QUIT
+
+start_brokers() {
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
+ LOCAL_PORT=`cat qpidd.port`
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
+ REMOTE_PORT=`cat qpidd.port`
+}
+
+stop_brokers() {
+ $QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORT
+ $QPIDD_EXEC --no-module-dir -q --port $REMOTE_PORT
+}
+
+if test -d ${PYTHON_DIR} ; then
+ start_brokers
+ echo "Running HeadersExchange federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT"
+ $QPID_PYTHON_TEST -m headers_federation -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT $@
+ RETCODE=$?
+ stop_brokers
+ if test x$RETCODE != x0; then
+ echo "FAIL federation tests"; exit 1;
+ fi
+fi