summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-11 12:24:46 +0000
committerGordon Sim <gsim@apache.org>2008-03-11 12:24:46 +0000
commitd38d509af075300441ad858b1bb7680ac6b8e5ca (patch)
tree900123f1d4cff0aa7caf63f12cca25817d6acac9
parent2390281e2a22a5935e6cbe553f87222d02c64198 (diff)
downloadqpid-python-d38d509af075300441ad858b1bb7680ac6b8e5ca.tar.gz
Fixed broker to take application headers from final format message-properties struct
Fixed headers exchange to recognise x-match even if sent as a string other than 32 bit sized Converted remaining python exchange tests git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@635898 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp36
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h2
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp9
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h3
-rw-r--r--python/cpp_failing_0-10.txt8
-rw-r--r--python/tests_0-10/exchange.py37
6 files changed, 59 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index c0f6cf19d2..634fcd7b88 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -21,6 +21,7 @@
#include "HeadersExchange.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
#include <algorithm>
@@ -35,9 +36,10 @@ using namespace qpid::sys;
using namespace qpid::broker;
namespace {
- const StringValue all("all");
- const StringValue any("any");
+ const std::string all("all");
+ const std::string any("any");
const std::string x_match("x-match");
+ const std::string empty;
}
HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
@@ -55,11 +57,27 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
+std::string HeadersExchange::getMatch(const FieldTable* args)
+{
+ if (!args) {
+ throw InternalErrorException(QPID_MSG("No arguments given."));
+ }
+ FieldTable::ValuePtr what = args->get(x_match);
+ if (!what) {
+ return empty;
+ }
+ if (!what->convertsTo<std::string>()) {
+ throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
+ }
+ return what->get<std::string>();
+}
+
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
- FieldTable::ValuePtr what = args->get(x_match);
- if (!what || (*what != all && *what != any))
+ std::string what = getMatch(args);
+ if (what != all && what != any)
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
+
Bindings::iterator i;
for (i = bindings.begin(); i != bindings.end(); i++)
@@ -100,6 +118,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
+ if (!args) return;//can't match if there were no headers passed in
+
RWlock::ScopedRlock locker(lock);
uint32_t count(0);
@@ -153,10 +173,8 @@ namespace
bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
typedef FieldTable::ValueMap Map;
- FieldTable::ValuePtr what = bind.get(x_match);
- if (!what) {
- return false;
- } else if (*what == all) {
+ std::string what = getMatch(&bind);
+ if (what == all) {
for (Map::const_iterator i = bind.begin();
i != bind.end();
++i)
@@ -169,7 +187,7 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
}
}
return true;
- } else if (*what == any) {
+ } else if (what == any) {
for (Map::const_iterator i = bind.begin();
i != bind.end();
++i)
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 4f654179c5..6e101e193a 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -38,6 +38,8 @@ class HeadersExchange : public virtual Exchange {
Bindings bindings;
qpid::sys::RWlock lock;
+ static std::string getMatch(const framing::FieldTable* args);
+
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
index 2515b2fff6..d54f30ff72 100644
--- a/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -47,7 +47,7 @@ namespace broker{
const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f)
{
- const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+ const framing::MessageProperties010* p = f.getHeaders()->get<framing::MessageProperties010>();
return p ? &(p->getApplicationHeaders()) : 0;
}
@@ -67,4 +67,11 @@ namespace broker{
const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
return p ? p->getRoutingKey() : empty;
}
+
+ const framing::FieldTable* PreviewAdapter::getApplicationHeaders(const framing::FrameSet& f)
+ {
+ const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+ return p ? &(p->getApplicationHeaders()) : 0;
+ }
+
}}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index 865d2e1685..6aebfdc9b3 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -51,7 +51,7 @@ struct TransferAdapter : MessageAdapter
virtual std::string getRoutingKey(const framing::FrameSet& f);
virtual std::string getExchange(const framing::FrameSet& f);
bool isImmediate(const framing::FrameSet&);
- const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
bool isPersistent(const framing::FrameSet& f);
};
@@ -59,6 +59,7 @@ struct PreviewAdapter : TransferAdapter
{
std::string getExchange(const framing::FrameSet& f);
std::string getRoutingKey(const framing::FrameSet& f);
+ const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
};
}}
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 8930f6e05b..8b0e11b3f0 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -10,14 +10,6 @@ tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_e
tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_queue
tests_0-10.alternate_exchange.AlternateExchangeTests.test_queue_delete
tests_0-10.alternate_exchange.AlternateExchangeTests.test_unroutable
-tests_0-10.exchange.DeclareMethodPassiveFieldNotFoundRuleTests.test
-tests_0-10.exchange.ExchangeTests.testHeadersBindNoMatchArg
-tests_0-10.exchange.HeadersExchangeTests.testMatchAll
-tests_0-10.exchange.HeadersExchangeTests.testMatchAny
-tests_0-10.exchange.MiscellaneousErrorsTests.testDifferentDeclaredType
-tests_0-10.exchange.MiscellaneousErrorsTests.testTypeNotKnown
-tests_0-10.exchange.RecommendedTypesRuleTests.testHeaders
-tests_0-10.exchange.RequiredInstancesRuleTests.testAmqMatch
tests_0-10.dtx.DtxTests.test_bad_resume
tests_0-10.dtx.DtxTests.test_end
tests_0-10.dtx.DtxTests.test_end_suspend_and_fail
diff --git a/python/tests_0-10/exchange.py b/python/tests_0-10/exchange.py
index 151807b045..be4f72c74c 100644
--- a/python/tests_0-10/exchange.py
+++ b/python/tests_0-10/exchange.py
@@ -27,6 +27,7 @@ import Queue, logging, traceback
from qpid.testlib import TestBase010
from qpid.datatypes import Message
from qpid.client import Closed
+from qpid.session import SessionException
class TestHelper(TestBase010):
@@ -48,6 +49,11 @@ class TestHelper(TestBase010):
def createMessage(self, key="", body=""):
return Message(self.session.delivery_properties(routing_key=key), body)
+ def getApplicationHeaders(self, msg):
+ for h in msg.headers:
+ if hasattr(h, 'application_headers'): return getattr(h, 'application_headers')
+ return None
+
def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
"""
Publish to exchange and assert queue.get() returns the same message.
@@ -59,7 +65,7 @@ class TestHelper(TestBase010):
msg = queue.get(timeout=1)
self.assertEqual(body, msg.body)
if (properties):
- self.assertEqual(properties, msg.content['application_headers'])
+ self.assertEqual(properties, self.getApplicationHeaders(msg))
def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
@@ -294,8 +300,8 @@ class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper):
try:
self.session.exchange_declare(exchange="humpty_dumpty", passive=True)
self.fail("Expected 404 for passive declaration of unknown exchange.")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
class DeclareMethodDurableFieldSupportRuleTests(TestHelper):
@@ -352,7 +358,8 @@ class HeadersExchangeTests(TestHelper):
self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
def myBasicPublish(self, headers):
- self.session.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers}))
+ mp=self.session.message_properties(application_headers=headers)
+ self.session.message_transfer(destination="amq.match", message=Message(mp, "foobar"))
def testMatchAll(self):
self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
@@ -386,21 +393,17 @@ class MiscellaneousErrorsTests(TestHelper):
try:
self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
self.fail("Expected 503 for declaration of unknown exchange type.")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
def testDifferentDeclaredType(self):
- self.session.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+ self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
try:
- self.session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+ session = self.conn.session("alternate", 2)
+ session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
self.fail("Expected 530 for redeclaration of exchange with different type.")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
- #cleanup
- other = self.connect()
- c2 = other.channel(1)
- c2.session_open()
- c2.exchange_delete(exchange="test_different_declared_type_exchange")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
class ExchangeTests(TestHelper):
def testHeadersBindNoMatchArg(self):
@@ -408,5 +411,5 @@ class ExchangeTests(TestHelper):
try:
self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
self.fail("Expected failure for missing x-match arg.")
- except Closed, e:
- self.assertConnectionException(541, e.args[0])
+ except SessionException, e:
+ self.assertEquals(541, e.args[0].error_code)