summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-09-27 19:50:23 +0000
committerAlan Conway <aconway@apache.org>2006-09-27 19:50:23 +0000
commitcaca23c5dc055d985fecfe188573104bc707ad9d (patch)
tree154c0bbd4c7bca70080de28116b5654491657906
parent9d718c2348708b0b27ce9fb9fcbf05c4b0a997cc (diff)
downloadqpid-python-caca23c5dc055d985fecfe188573104bc707ad9d.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450556 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/Makefile12
-rw-r--r--cpp/broker/src/HeadersExchange.cpp119
-rw-r--r--cpp/broker/src/SessionHandlerFactoryImpl.cpp3
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp8
-rw-r--r--cpp/common/framing/inc/AMQBody.h8
-rw-r--r--cpp/common/framing/inc/AMQContentBody.h1
-rw-r--r--cpp/common/framing/inc/AMQHeaderBody.h2
-rw-r--r--cpp/common/framing/inc/AMQHeartbeatBody.h3
-rw-r--r--cpp/common/framing/inc/Buffer.h3
-rw-r--r--cpp/common/framing/inc/FieldTable.h72
-rw-r--r--cpp/common/framing/inc/NamedValue.h49
-rw-r--r--cpp/common/framing/inc/Value.h207
-rw-r--r--cpp/common/framing/src/AMQContentBody.cpp5
-rw-r--r--cpp/common/framing/src/AMQFrame.cpp18
-rw-r--r--cpp/common/framing/src/AMQHeaderBody.cpp13
-rw-r--r--cpp/common/framing/src/Buffer.cpp1
-rw-r--r--cpp/common/framing/src/FieldTable.cpp145
-rw-r--r--cpp/common/framing/src/NamedValue.cpp67
-rw-r--r--cpp/common/framing/src/Value.cpp82
-rw-r--r--python/qpid/connection.py8
-rw-r--r--python/qpid/peer.py3
-rw-r--r--python/qpid/testlib.py20
-rw-r--r--python/tests/exchange.py76
-rw-r--r--python/tests/testlib.py9
24 files changed, 573 insertions, 361 deletions
diff --git a/cpp/Makefile b/cpp/Makefile
index 9abdf40f14..28f2212a7a 100644
--- a/cpp/Makefile
+++ b/cpp/Makefile
@@ -22,16 +22,20 @@
UNITTESTS=$(wildcard common/*/test/*.so broker/test/*.so)
-.PHONY: all clean doxygen
+.PHONY: all test unittest pythontest runtests clean doxygen
-test: all
- @$(MAKE) runtests
+test: all runtests
-runtests:
+unittest:
DllPlugInTester -c -b $(UNITTESTS)
+
+pythontest:
bin/qpidd >> qpidd.log &
cd ../python ; ./run-tests -v -I cpp_failing.txt
+runtests:
+ $(MAKE) -k unittest pythontest
+
all:
@$(MAKE) -C common all
@$(MAKE) -C broker all
diff --git a/cpp/broker/src/HeadersExchange.cpp b/cpp/broker/src/HeadersExchange.cpp
new file mode 100644
index 0000000000..03a029ea4d
--- /dev/null
+++ b/cpp/broker/src/HeadersExchange.cpp
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "HeadersExchange.h"
+#include "ExchangeBinding.h"
+#include "Value.h"
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// The current search algorithm really sucks.
+// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
+
+namespace qpid {
+namespace broker {
+
+namespace {
+const std::string all("all");
+const std::string any("any");
+const std::string x_match("x-match");
+}
+
+HeadersExchange::HeadersExchange(const string& name) : Exchange(name) { }
+
+void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ std::cout << "HeadersExchange::bind" << std::endl;
+ Locker locker(lock);
+ std::string what = args->getString("x-match");
+ // TODO aconway 2006-09-26: throw an exception for invalid bindings.
+ if (what != all && what != any) return; // Invalid.
+ bindings.push_back(Binding(*args, queue));
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void HeadersExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ Locker locker(lock);;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first == *args) {
+ bindings.erase(i);
+ }
+ }
+}
+
+
+void HeadersExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+ std::cout << "route: " << *args << std::endl;
+ Locker locker(lock);;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, *args)) i->second->deliver(msg);
+ }
+}
+
+HeadersExchange::~HeadersExchange() {}
+
+const std::string HeadersExchange::typeName("headers");
+namespace
+{
+
+bool match_values(const Value& bind, const Value& msg) {
+ return dynamic_cast<const EmptyValue*>(&bind) || bind == msg;
+}
+
+}
+
+
+bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
+ typedef FieldTable::ValueMap Map;
+ std::string what = bind.getString(x_match);
+ if (what == all) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j == msg.getMap().end()) return false;
+ if (!match_values(*(i->second), *(j->second))) return false;
+ }
+ }
+ return true;
+ } else if (what == any) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j != msg.getMap().end()) {
+ if (match_values(*(i->second), *(j->second))) return true;
+ }
+ }
+ }
+ return false;
+ } else {
+ return false;
+ }
+}
+
+}}
+
diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
index 280e89c475..39c627afef 100644
--- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp
+++ b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
@@ -18,6 +18,7 @@
#include "SessionHandlerFactoryImpl.h"
#include "SessionHandlerImpl.h"
#include "FanOutExchange.h"
+#include "HeadersExchange.h"
using namespace qpid::broker;
using namespace qpid::io;
@@ -28,6 +29,7 @@ const std::string empty;
const std::string amq_direct("amq.direct");
const std::string amq_topic("amq.topic");
const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
}
SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
@@ -35,6 +37,7 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeo
exchanges.declare(new DirectExchange(amq_direct));
exchanges.declare(new TopicExchange(amq_topic));
exchanges.declare(new FanOutExchange(amq_fanout));
+ exchanges.declare(new HeadersExchange(amq_match));
cleaner.start();
}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index 2ce1c4b298..eb8f37030c 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -18,6 +18,8 @@
#include <iostream>
#include "SessionHandlerImpl.h"
#include "FanOutExchange.h"
+#include "TopicExchange.h"
+#include "HeadersExchange.h"
#include "assert.h"
using namespace std::tr1;
@@ -223,7 +225,9 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
if(!passive && (
type != TopicExchange::typeName &&
type != DirectExchange::typeName &&
- type != FanOutExchange::typeName)
+ type != FanOutExchange::typeName &&
+ type != HeadersExchange::typeName
+ )
)
{
throw ChannelException(540, "Exchange type not implemented: " + type);
@@ -237,6 +241,8 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
parent->exchanges->declare(new DirectExchange(exchange));
}else if(type == FanOutExchange::typeName){
parent->exchanges->declare(new DirectExchange(exchange));
+ }else if (type == HeadersExchange::typeName) {
+ parent->exchanges->declare(new HeadersExchange(exchange));
}
}
parent->exchanges->getLock()->release();
diff --git a/cpp/common/framing/inc/AMQBody.h b/cpp/common/framing/inc/AMQBody.h
index d4b436c949..a1770820a9 100644
--- a/cpp/common/framing/inc/AMQBody.h
+++ b/cpp/common/framing/inc/AMQBody.h
@@ -27,18 +27,20 @@ namespace qpid {
class AMQBody
{
- public:
+ public:
typedef std::tr1::shared_ptr<AMQBody> shared_ptr;
+ virtual ~AMQBody();
virtual u_int32_t size() const = 0;
virtual u_int8_t type() const = 0;
virtual void encode(Buffer& buffer) const = 0;
virtual void decode(Buffer& buffer, u_int32_t size) = 0;
- inline virtual ~AMQBody(){}
+ virtual void print(std::ostream& out) const;
};
- enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8};
+ std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
+ enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8};
}
}
diff --git a/cpp/common/framing/inc/AMQContentBody.h b/cpp/common/framing/inc/AMQContentBody.h
index 8e97c31edb..1a6f2cf117 100644
--- a/cpp/common/framing/inc/AMQContentBody.h
+++ b/cpp/common/framing/inc/AMQContentBody.h
@@ -40,6 +40,7 @@ public:
u_int32_t size() const;
void encode(Buffer& buffer) const;
void decode(Buffer& buffer, u_int32_t size);
+ void print(std::ostream& out) const;
};
}
diff --git a/cpp/common/framing/inc/AMQHeaderBody.h b/cpp/common/framing/inc/AMQHeaderBody.h
index 369db8a9c8..e39fffa8ce 100644
--- a/cpp/common/framing/inc/AMQHeaderBody.h
+++ b/cpp/common/framing/inc/AMQHeaderBody.h
@@ -40,12 +40,14 @@ public:
AMQHeaderBody();
inline u_int8_t type() const { return HEADER_BODY; }
HeaderProperties* getProperties(){ return properties; }
+ const HeaderProperties* getProperties() const { return properties; }
inline u_int64_t getContentSize() const { return contentSize; }
inline void setContentSize(u_int64_t size) { contentSize = size; }
virtual ~AMQHeaderBody();
virtual u_int32_t size() const;
virtual void encode(Buffer& buffer) const;
virtual void decode(Buffer& buffer, u_int32_t size);
+ virtual void print(std::ostream& out) const;
};
}
diff --git a/cpp/common/framing/inc/AMQHeartbeatBody.h b/cpp/common/framing/inc/AMQHeartbeatBody.h
index ca2def977a..cfe057bdcd 100644
--- a/cpp/common/framing/inc/AMQHeartbeatBody.h
+++ b/cpp/common/framing/inc/AMQHeartbeatBody.h
@@ -30,11 +30,12 @@ class AMQHeartbeatBody : virtual public AMQBody
public:
typedef std::tr1::shared_ptr<AMQHeartbeatBody> shared_ptr;
- virtual ~AMQHeartbeatBody() {}
+ virtual ~AMQHeartbeatBody();
inline u_int32_t size() const { return 0; }
inline u_int8_t type() const { return HEARTBEAT_BODY; }
inline void encode(Buffer& buffer) const {}
inline void decode(Buffer& buffer, u_int32_t size) {}
+ virtual void print(std::ostream& out) const;
};
}
diff --git a/cpp/common/framing/inc/Buffer.h b/cpp/common/framing/inc/Buffer.h
index 1ff4611f1f..e0532cc9d6 100644
--- a/cpp/common/framing/inc/Buffer.h
+++ b/cpp/common/framing/inc/Buffer.h
@@ -16,7 +16,6 @@
*
*/
#include "amqp_types.h"
-#include "FieldTable.h"
#ifndef _Buffer_
#define _Buffer_
@@ -24,6 +23,8 @@
namespace qpid {
namespace framing {
+class FieldTable;
+
class Buffer
{
const int size;
diff --git a/cpp/common/framing/inc/FieldTable.h b/cpp/common/framing/inc/FieldTable.h
index cf935d3284..7da914968e 100644
--- a/cpp/common/framing/inc/FieldTable.h
+++ b/cpp/common/framing/inc/FieldTable.h
@@ -17,6 +17,8 @@
*/
#include <iostream>
#include <vector>
+#include <tr1/memory>
+#include <tr1/unordered_map>
#include "amqp_types.h"
#ifndef _FieldTable_
@@ -25,42 +27,50 @@
namespace qpid {
namespace framing {
- class NamedValue;
- class Value;
- class Buffer;
+class Value;
+class Buffer;
- class FieldTable
- {
- std::vector<NamedValue*> values;
- NamedValue* find(const std::string& name) const;
+class FieldTable
+{
+ public:
+ typedef std::tr1::shared_ptr<Value> ValuePtr;
+ typedef std::tr1::unordered_map<std::string, ValuePtr> ValueMap;
- Value* getValue(const std::string& name) const;
- void setValue(const std::string& name, Value* value);
+ ~FieldTable();
+ u_int32_t size() const;
+ int count() const;
+ void setString(const std::string& name, const std::string& value);
+ void setInt(const std::string& name, int value);
+ void setTimestamp(const std::string& name, u_int64_t value);
+ void setTable(const std::string& name, const FieldTable& value);
+ //void setDecimal(string& name, xxx& value);
+ std::string getString(const std::string& name) const;
+ int getInt(const std::string& name) const;
+ u_int64_t getTimestamp(const std::string& name) const;
+ void getTable(const std::string& name, FieldTable& value) const;
+ //void getDecimal(string& name, xxx& value);
+ void erase(const std::string& name);
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
- public:
- ~FieldTable();
- u_int32_t size() const;
- int count() const;
- void setString(const std::string& name, const std::string& value);
- void setInt(const std::string& name, int value);
- void setTimestamp(const std::string& name, u_int64_t value);
- void setTable(const std::string& name, const FieldTable& value);
- //void setDecimal(string& name, xxx& value);
- std::string getString(const std::string& name);
- int getInt(const std::string& name);
- u_int64_t getTimestamp(const std::string& name);
- void getTable(const std::string& name, FieldTable& value);
- //void getDecimal(string& name, xxx& value);
+ bool operator==(const FieldTable& other) const;
- void encode(Buffer& buffer) const;
- void decode(Buffer& buffer);
+ // TODO aconway 2006-09-26: Yeuch! Rework FieldTable to have
+ // a map-like interface.
+ const ValueMap& getMap() const { return values; }
+ ValueMap& getMap() { return values; }
+
+
+ private:
+ friend std::ostream& operator<<(std::ostream& out, const FieldTable& body);
+ ValueMap values;
+ template<class T> T getValue(const std::string& name) const;
+};
- friend std::ostream& operator<<(std::ostream& out, const FieldTable& body);
- };
-
- class FieldNotFoundException{};
- class UnknownFieldName : public FieldNotFoundException{};
- class IncorrectFieldType : public FieldNotFoundException{};
+class FieldNotFoundException{};
+class UnknownFieldName : public FieldNotFoundException{};
+class IncorrectFieldType : public FieldNotFoundException{};
}
}
diff --git a/cpp/common/framing/inc/NamedValue.h b/cpp/common/framing/inc/NamedValue.h
deleted file mode 100644
index 729b5d08a7..0000000000
--- a/cpp/common/framing/inc/NamedValue.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <vector>
-#include "amqp_types.h"
-#include "Value.h"
-
-#ifndef _NamedValue_
-#define _NamedValue_
-
-namespace qpid {
-namespace framing {
-
- class Buffer;
-
- class NamedValue{
- string name;
- Value* value;
- public:
- NamedValue();
- NamedValue(const string& name, Value* value);
- ~NamedValue();
- void encode(Buffer& buffer);
- void decode(Buffer& buffer);
- u_int32_t size() const;
- inline const string& getName() const { return name; }
- inline Value* getValue() const { return value; }
- inline void setValue(Value* val) { value = val; }
- };
-}
-}
-
-
-#endif
diff --git a/cpp/common/framing/inc/Value.h b/cpp/common/framing/inc/Value.h
index e3d2a2c1d6..3d525a0bef 100644
--- a/cpp/common/framing/inc/Value.h
+++ b/cpp/common/framing/inc/Value.h
@@ -26,84 +26,135 @@
namespace qpid {
namespace framing {
- class Buffer;
-
- class Value{
- public:
- inline virtual ~Value(){}
- virtual u_int32_t size() const = 0;
- virtual char getType() const = 0;
- virtual void encode(Buffer& buffer) = 0;
- virtual void decode(Buffer& buffer) = 0;
- };
-
- class StringValue : public virtual Value{
- string value;
-
- public:
- inline StringValue(const string& v) : value(v){}
- inline StringValue(){}
- inline string getValue(){ return value; }
- ~StringValue(){}
- inline virtual u_int32_t size() const { return 4 + value.length(); }
- inline virtual char getType() const { return 'S'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-
- class IntegerValue : public virtual Value{
- int value;
- public:
- inline IntegerValue(int v) : value(v){}
- inline IntegerValue(){}
- inline int getValue(){ return value; }
- ~IntegerValue(){}
- inline virtual u_int32_t size() const { return 4; }
- inline virtual char getType() const { return 'I'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-
- class TimeValue : public virtual Value{
- u_int64_t value;
- public:
- inline TimeValue(int v) : value(v){}
- inline TimeValue(){}
- inline u_int64_t getValue(){ return value; }
- ~TimeValue(){}
- inline virtual u_int32_t size() const { return 8; }
- inline virtual char getType() const { return 'T'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-
- class DecimalValue : public virtual Value{
- u_int8_t decimals;
- u_int32_t value;
- public:
- inline DecimalValue(int v) : value(v){}
- inline DecimalValue(){}
- ~DecimalValue(){}
- inline virtual u_int32_t size() const { return 5; }
- inline virtual char getType() const { return 'D'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-
- class FieldTableValue : public virtual Value{
- FieldTable value;
- public:
- inline FieldTableValue(const FieldTable& v) : value(v){}
- inline FieldTableValue(){}
- inline FieldTable getValue(){ return value; }
- ~FieldTableValue(){}
- inline virtual u_int32_t size() const { return 4 + value.size(); }
- inline virtual char getType() const { return 'F'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-}
-}
+class Buffer;
+/**
+ * Represents a decimal value.
+ * No arithmetic functionality for now, we only care about encoding/decoding.
+ */
+struct Decimal {
+ u_int32_t value;
+ u_int8_t decimals;
+
+ Decimal(u_int32_t value_=0, u_int8_t decimals_=0) : value(value_), decimals(decimals_) {}
+ bool operator==(const Decimal& d) const {
+ return decimals == d.decimals && value == d.value;
+ }
+ bool operator!=(const Decimal& d) const { return !(*this == d); }
+};
+
+std::ostream& operator<<(std::ostream& out, const Decimal& d);
+
+/**
+ * Polymorpic base class for values.
+ */
+class Value {
+ public:
+ virtual ~Value();
+ virtual u_int32_t size() const = 0;
+ virtual char getType() const = 0;
+ virtual void encode(Buffer& buffer) = 0;
+ virtual void decode(Buffer& buffer) = 0;
+ virtual bool operator==(const Value&) const = 0;
+ bool operator!=(const Value& v) const { return !(*this == v); }
+ virtual void print(std::ostream& out) const = 0;
+
+ /** Create a new value by decoding from the buffer */
+ static std::auto_ptr<Value> decode_value(Buffer& buffer);
+};
+
+std::ostream& operator<<(std::ostream& out, const Value& d);
+
+
+/**
+ * Template for common operations on Value sub-classes.
+ */
+template <class T>
+class ValueOps : public Value
+{
+ protected:
+ T value;
+ public:
+ ValueOps() {}
+ ValueOps(const T& v) : value(v) {}
+ const T& getValue() const { return value; }
+ T& getValue() { return value; }
+
+ virtual bool operator==(const Value& v) const {
+ const ValueOps<T>* vo = dynamic_cast<const ValueOps<T>*>(&v);
+ if (vo == 0) return false;
+ else return value == vo->value;
+ }
+
+ void print(std::ostream& out) const { out << value; }
+};
+
+
+class StringValue : public ValueOps<std::string> {
+ public:
+ StringValue(const std::string& v) : ValueOps<std::string>(v) {}
+ StringValue() {}
+ virtual u_int32_t size() const { return 4 + value.length(); }
+ virtual char getType() const { return 'S'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class IntegerValue : public ValueOps<int> {
+ public:
+ IntegerValue(int v) : ValueOps<int>(v) {}
+ IntegerValue(){}
+ virtual u_int32_t size() const { return 4; }
+ virtual char getType() const { return 'I'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class TimeValue : public ValueOps<u_int64_t> {
+ public:
+ TimeValue(u_int64_t v) : ValueOps<u_int64_t>(v){}
+ TimeValue(){}
+ virtual u_int32_t size() const { return 8; }
+ virtual char getType() const { return 'T'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class DecimalValue : public ValueOps<Decimal> {
+ public:
+ DecimalValue(const Decimal& d) : ValueOps<Decimal>(d) {}
+ DecimalValue(u_int32_t value_=0, u_int8_t decimals_=0) :
+ ValueOps<Decimal>(Decimal(value_, decimals_)){}
+ virtual u_int32_t size() const { return 5; }
+ virtual char getType() const { return 'D'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+
+class FieldTableValue : public ValueOps<FieldTable> {
+ public:
+ FieldTableValue(const FieldTable& v) : ValueOps<FieldTable>(v){}
+ FieldTableValue(){}
+ virtual u_int32_t size() const { return 4 + value.size(); }
+ virtual char getType() const { return 'F'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class EmptyValue : public Value {
+ public:
+ ~EmptyValue();
+ virtual u_int32_t size() const { return 0; }
+ virtual char getType() const { return 0; }
+ virtual void encode(Buffer& buffer) {}
+ virtual void decode(Buffer& buffer) {}
+ virtual bool operator==(const Value& v) const {
+ return dynamic_cast<const EmptyValue*>(&v);
+ }
+ virtual void print(std::ostream& out) const;
+};
+
+}} // qpid::framing
#endif
diff --git a/cpp/common/framing/src/AMQContentBody.cpp b/cpp/common/framing/src/AMQContentBody.cpp
index c8aadc8108..a9ee190ba8 100644
--- a/cpp/common/framing/src/AMQContentBody.cpp
+++ b/cpp/common/framing/src/AMQContentBody.cpp
@@ -16,6 +16,7 @@
*
*/
#include "AMQContentBody.h"
+#include <iostream>
qpid::framing::AMQContentBody::AMQContentBody(){
}
@@ -33,3 +34,7 @@ void qpid::framing::AMQContentBody::decode(Buffer& buffer, u_int32_t size){
buffer.getRawData(data, size);
}
+void qpid::framing::AMQContentBody::print(std::ostream& out) const
+{
+ out << "content (" << size() << " bytes)";
+}
diff --git a/cpp/common/framing/src/AMQFrame.cpp b/cpp/common/framing/src/AMQFrame.cpp
index 70f71010ff..5686c9ac81 100644
--- a/cpp/common/framing/src/AMQFrame.cpp
+++ b/cpp/common/framing/src/AMQFrame.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -126,21 +127,8 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t){
out << "Frame[channel=" << t.channel << "; ";
- if(t.body.get() == 0){
- out << "empty";
- }else if(t.body->type() == METHOD_BODY){
- (dynamic_cast<AMQMethodBody*>(t.body.get()))->print(out);
- }else if(t.body->type() == HEADER_BODY){
- out << "header, content_size=" <<
- (dynamic_cast<AMQHeaderBody*>(t.body.get()))->getContentSize()
- << " (" << t.body->size() << " bytes)";
- }else if(t.body->type() == CONTENT_BODY){
- out << "content (" << t.body->size() << " bytes)";
- }else if(t.body->type() == HEARTBEAT_BODY){
- out << "heartbeat";
- }else{
- out << "unknown type, " << t.body->type();
- }
+ if (t.body.get() == 0) out << "empty";
+ else out << *t.body;
out << "]";
return out;
}
diff --git a/cpp/common/framing/src/AMQHeaderBody.cpp b/cpp/common/framing/src/AMQHeaderBody.cpp
index 4bf1626a8a..1fd387c5d5 100644
--- a/cpp/common/framing/src/AMQHeaderBody.cpp
+++ b/cpp/common/framing/src/AMQHeaderBody.cpp
@@ -58,3 +58,16 @@ void qpid::framing::AMQHeaderBody::createProperties(int classId){
THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class");
}
}
+
+void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
+{
+ out << "header, content_size=" << getContentSize()
+ << " (" << size() << " bytes)" << ", headers=" ;
+ // TODO aconway 2006-09-26: Hack to see headers.
+ // Should write proper op << for BasicHeaderProperties.
+ //
+ const BasicHeaderProperties* props =
+ dynamic_cast<const BasicHeaderProperties*>(getProperties());
+ // TODO aconway 2006-09-26: Lose the static cast, fix BasicHeaderProperties
+ if (props) out << const_cast<BasicHeaderProperties*>(props)->getHeaders();
+}
diff --git a/cpp/common/framing/src/Buffer.cpp b/cpp/common/framing/src/Buffer.cpp
index 5264491980..15a4485abd 100644
--- a/cpp/common/framing/src/Buffer.cpp
+++ b/cpp/common/framing/src/Buffer.cpp
@@ -16,6 +16,7 @@
*
*/
#include "Buffer.h"
+#include "FieldTable.h"
qpid::framing::Buffer::Buffer(int _size) : size(_size), position(0), limit(_size){
data = new char[size];
diff --git a/cpp/common/framing/src/FieldTable.cpp b/cpp/common/framing/src/FieldTable.cpp
index 048cefa83c..b12b2783df 100644
--- a/cpp/common/framing/src/FieldTable.cpp
+++ b/cpp/common/framing/src/FieldTable.cpp
@@ -16,112 +16,133 @@
*
*/
#include "FieldTable.h"
-#include "NamedValue.h"
#include "QpidError.h"
#include "Buffer.h"
#include "Value.h"
+#include <assert.h>
-qpid::framing::FieldTable::~FieldTable(){
- int count(values.size());
- for(int i = 0; i < count; i++){
- delete values[i];
- }
-}
+namespace qpid {
+namespace framing {
-u_int32_t qpid::framing::FieldTable::size() const {
+FieldTable::~FieldTable() {}
+
+u_int32_t FieldTable::size() const {
u_int32_t size(4);
- int count(values.size());
- for(int i = 0; i < count; i++){
- size += values[i]->size();
+ for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
+ // 2 = shortstr_len_byyte + type_char_byte
+ size += 2 + (i->first).size() + (i->second)->size();
}
return size;
}
-int qpid::framing::FieldTable::count() const {
+int FieldTable::count() const {
return values.size();
}
-std::ostream& qpid::framing::operator<<(std::ostream& out, const FieldTable& t){
- out << "field_table{}";
- return out;
+namespace
+{
+std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_type& i) {
+ return out << i.first << ":" << *i.second;
+}
}
-void qpid::framing::FieldTable::setString(const std::string& name, const std::string& value){
- setValue(name, new StringValue(value));
+std::ostream& operator<<(std::ostream& out, const FieldTable& t) {
+ out << "field_table{";
+ FieldTable::ValueMap::const_iterator i = t.getMap().begin();
+ if (i != t.getMap().end()) out << *i++;
+ while (i != t.getMap().end())
+ {
+ out << "," << *i++;
+ }
+ return out << "}";
}
-void qpid::framing::FieldTable::setInt(const std::string& name, int value){
- setValue(name, new IntegerValue(value));
+void FieldTable::setString(const std::string& name, const std::string& value){
+ values[name] = ValuePtr(new StringValue(value));
}
-void qpid::framing::FieldTable::setTimestamp(const std::string& name, u_int64_t value){
- setValue(name, new TimeValue(value));
+void FieldTable::setInt(const std::string& name, int value){
+ values[name] = ValuePtr(new IntegerValue(value));
}
-void qpid::framing::FieldTable::setTable(const std::string& name, const FieldTable& value){
- setValue(name, new FieldTableValue(value));
+void FieldTable::setTimestamp(const std::string& name, u_int64_t value){
+ values[name] = ValuePtr(new TimeValue(value));
}
-std::string qpid::framing::FieldTable::getString(const std::string& name){
- StringValue* val = dynamic_cast<StringValue*>(getValue(name));
- return (val == 0 ? "" : val->getValue());
+void FieldTable::setTable(const std::string& name, const FieldTable& value){
+ values[name] = ValuePtr(new FieldTableValue(value));
}
-int qpid::framing::FieldTable::getInt(const std::string& name){
- IntegerValue* val = dynamic_cast<IntegerValue*>(getValue(name));
- return (val == 0 ? 0 : val->getValue());
+namespace {
+// TODO aconway 2006-09-26: This is messy. Revisit the field table
+// and Value classes with a traits-based approach.
+//
+template <class T> T default_value() { return T(); }
+template <> int default_value<int>() { return 0; }
+template <> u_int64_t default_value<u_int64_t>() { return 0; }
}
-u_int64_t qpid::framing::FieldTable::getTimestamp(const std::string& name){
- TimeValue* val = dynamic_cast<TimeValue*>(getValue(name));
- return (val == 0 ? 0 : val->getValue());
+template <class T>
+T FieldTable::getValue(const std::string& name) const
+{
+ ValueMap::const_iterator i = values.find(name);
+ if (i == values.end()) return default_value<T>();
+ const ValueOps<T> *vt = dynamic_cast<const ValueOps<T>*>(i->second.get());
+ return vt->getValue();
}
-void qpid::framing::FieldTable::getTable(const std::string& name, FieldTable& value){
- FieldTableValue* val = dynamic_cast<FieldTableValue*>(getValue(name));
- if(val != 0) value = val->getValue();
+std::string FieldTable::getString(const std::string& name) const {
+ return getValue<std::string>(name);
}
-qpid::framing::NamedValue* qpid::framing::FieldTable::find(const std::string& name) const{
- int count(values.size());
- for(int i = 0; i < count; i++){
- if(values[i]->getName() == name) return values[i];
- }
- return 0;
+int FieldTable::getInt(const std::string& name) const {
+ return getValue<int>(name);
}
-qpid::framing::Value* qpid::framing::FieldTable::getValue(const std::string& name) const{
- NamedValue* val = find(name);
- return val == 0 ? 0 : val->getValue();
+u_int64_t FieldTable::getTimestamp(const std::string& name) const {
+ return getValue<u_int64_t>(name);
}
-void qpid::framing::FieldTable::setValue(const std::string& name, Value* value){
- NamedValue* val = find(name);
- if(val == 0){
- val = new NamedValue(name, value);
- values.push_back(val);
- }else{
- Value* old = val->getValue();
- if(old != 0) delete old;
- val->setValue(value);
- }
+void FieldTable::getTable(const std::string& name, FieldTable& value) const {
+ value = getValue<FieldTable>(name);
}
-void qpid::framing::FieldTable::encode(Buffer& buffer) const{
+void FieldTable::encode(Buffer& buffer) const{
buffer.putLong(size() - 4);
- int count(values.size());
- for(int i = 0; i < count; i++){
- values[i]->encode(buffer);
+ for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) {
+ buffer.putShortString(i->first);
+ buffer.putOctet(i->second->getType());
+ i->second->encode(buffer);
}
}
-void qpid::framing::FieldTable::decode(Buffer& buffer){
+void FieldTable::decode(Buffer& buffer){
u_int32_t size = buffer.getLong();
int leftover = buffer.available() - size;
while(buffer.available() > leftover){
- NamedValue* value = new NamedValue();
- value->decode(buffer);
- values.push_back(value);
+ std::string name;
+ buffer.getShortString(name);
+ std::auto_ptr<Value> value(Value::decode_value(buffer));
+ values[name] = ValuePtr(value.release());
}
}
+
+
+bool FieldTable::operator==(const FieldTable& x) const {
+ if (values.size() != x.values.size()) return false;
+ for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
+ ValueMap::const_iterator j = x.values.find(i->first);
+ if (j == x.values.end()) return false;
+ if (*(i->second) != *(j->second)) return false;
+ }
+ return true;
+}
+
+void FieldTable::erase(const std::string& name)
+{
+ values.erase(values.find(name));
+}
+
+}
+}
diff --git a/cpp/common/framing/src/NamedValue.cpp b/cpp/common/framing/src/NamedValue.cpp
deleted file mode 100644
index e80aea433c..0000000000
--- a/cpp/common/framing/src/NamedValue.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "NamedValue.h"
-#include "QpidError.h"
-#include "Buffer.h"
-#include "FieldTable.h"
-
-qpid::framing::NamedValue::NamedValue() : value(0){}
-
-qpid::framing::NamedValue::NamedValue(const string& n, Value* v) : name(n), value(v){}
-
-qpid::framing::NamedValue::~NamedValue(){
- if(value != 0){
- delete value;
- }
-}
-
-u_int32_t qpid::framing::NamedValue::size() const{
- return value ? 1/*size of name*/ + name.length() + 1/*type char*/ + value->size() : 0;
-}
-
-void qpid::framing::NamedValue::encode(Buffer& buffer){
- buffer.putShortString(name);
- u_int8_t type = value->getType();
- buffer.putOctet(type);
- value->encode(buffer);
-}
-
-void qpid::framing::NamedValue::decode(Buffer& buffer){
- buffer.getShortString(name);
- u_int8_t type = buffer.getOctet();
- switch(type){
- case 'S':
- value = new StringValue();
- break;
- case 'I':
- value = new IntegerValue();
- break;
- case 'D':
- value = new DecimalValue();
- break;
- case 'T':
- value = new TimeValue();
- break;
- case 'F':
- value = new FieldTableValue();
- break;
- default:
- THROW_QPID_ERROR(FRAMING_ERROR, "Unknown field table value type");
- }
- value->decode(buffer);
-}
diff --git a/cpp/common/framing/src/Value.cpp b/cpp/common/framing/src/Value.cpp
index 240b086696..1c210fdb12 100644
--- a/cpp/common/framing/src/Value.cpp
+++ b/cpp/common/framing/src/Value.cpp
@@ -18,40 +18,94 @@
#include "Value.h"
#include "Buffer.h"
#include "FieldTable.h"
+#include "QpidError.h"
-void qpid::framing::StringValue::encode(Buffer& buffer){
+namespace qpid {
+namespace framing {
+
+Value::~Value() {}
+
+void StringValue::encode(Buffer& buffer){
buffer.putLongString(value);
}
-void qpid::framing::StringValue::decode(Buffer& buffer){
+void StringValue::decode(Buffer& buffer){
buffer.getLongString(value);
}
-void qpid::framing::IntegerValue::encode(Buffer& buffer){
+void IntegerValue::encode(Buffer& buffer){
buffer.putLong((u_int32_t) value);
}
-void qpid::framing::IntegerValue::decode(Buffer& buffer){
+void IntegerValue::decode(Buffer& buffer){
value = buffer.getLong();
}
-void qpid::framing::TimeValue::encode(Buffer& buffer){
+void TimeValue::encode(Buffer& buffer){
buffer.putLongLong(value);
}
-void qpid::framing::TimeValue::decode(Buffer& buffer){
+void TimeValue::decode(Buffer& buffer){
value = buffer.getLongLong();
}
-void qpid::framing::DecimalValue::encode(Buffer& buffer){
- buffer.putOctet(decimals);
- buffer.putLong(value);
+void DecimalValue::encode(Buffer& buffer){
+ buffer.putOctet(value.decimals);
+ buffer.putLong(value.value);
}
-void qpid::framing::DecimalValue::decode(Buffer& buffer){
- decimals = buffer.getOctet();
- value = buffer.getLong();
+void DecimalValue::decode(Buffer& buffer){
+ value = Decimal(buffer.getLong(), buffer.getOctet());
}
-void qpid::framing::FieldTableValue::encode(Buffer& buffer){
+void FieldTableValue::encode(Buffer& buffer){
buffer.putFieldTable(value);
}
-void qpid::framing::FieldTableValue::decode(Buffer& buffer){
+void FieldTableValue::decode(Buffer& buffer){
buffer.getFieldTable(value);
}
+
+std::auto_ptr<Value> Value::decode_value(Buffer& buffer)
+{
+ std::auto_ptr<Value> value;
+ u_int8_t type = buffer.getOctet();
+ switch(type){
+ case 'S':
+ value.reset(new StringValue());
+ break;
+ case 'I':
+ value.reset(new IntegerValue());
+ break;
+ case 'D':
+ value.reset(new DecimalValue());
+ break;
+ case 'T':
+ value.reset(new TimeValue());
+ break;
+ case 'F':
+ value.reset(new FieldTableValue());
+ break;
+ default:
+ THROW_QPID_ERROR(FRAMING_ERROR, "Unknown field table value type");
+ }
+ value->decode(buffer);
+ return value;
+}
+
+EmptyValue::~EmptyValue() {}
+
+void EmptyValue::print(std::ostream& out) const
+{
+ out << "<empty field value>";
+}
+
+std::ostream& operator<<(std::ostream& out, const Value& v) {
+ v.print(out);
+ return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const Decimal& d)
+{
+ return out << "Decimal(" << d.value << "," << d.decimals << ")";
+}
+
+}}
+
+
+
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index f4d0817e60..fc6c147f2b 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -20,7 +20,7 @@ to read and write Frame objects. This could be used by a client,
server, or even a proxy implementation.
"""
-import socket, codec
+import socket, codec,logging
from cStringIO import StringIO
from spec import load, pythonize
from codec import EOF
@@ -240,8 +240,10 @@ class Header(Payload):
properties = {}
for b, f in zip(bits, klass.fields):
if b:
- properties[f.name] = c.decode(f.type)
-
+ # Note: decode returns a unicode u'' string but only
+ # plain '' strings can be used as keywords so we need to
+ # stringify the names.
+ properties[str(f.name)] = c.decode(f.type)
return Header(klass, weight, size, **properties)
def __str__(self):
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 31d3d24f5f..3085e24247 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -146,7 +146,6 @@ class Channel:
def invoke(self, method, args, content = None):
if self.closed:
raise Closed(self.reason)
-
frame = Frame(self.id, Method(method, *args))
self.outgoing.put(frame)
@@ -181,7 +180,7 @@ class Channel:
def write_content(self, klass, content, queue):
size = content.size()
- header = Frame(self.id, Header(klass, content.weight(), size))
+ header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
queue.put(header)
for child in content.children:
self.write_content(klass, child, queue)
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 0bec6a8708..92925bea20 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -22,7 +22,7 @@ import sys, re, unittest, os, random, logging
import qpid.client, qpid.spec
import Queue
from getopt import getopt, GetoptError
-
+from qpid.content import Content
def findmodules(root):
"""Find potential python modules under directory root"""
@@ -161,10 +161,6 @@ class TestBase(unittest.TestCase):
self.channel.channel_open()
def tearDown(self):
- # TODO aconway 2006-09-05: Wrong behaviour here, we should
- # close all open channels (checking for exceptions on the
- # channesl) then open a channel to clean up qs and exs,
- # finally close that channel.
for ch, q in self.queues:
ch.queue_delete(queue=q)
for ch, ex in self.exchanges:
@@ -186,13 +182,11 @@ class TestBase(unittest.TestCase):
arguments={}):
channel = channel or self.channel
reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
- # TODO aconway 2006-09-14: Don't add exchange on failure.
self.exchanges.append((channel,exchange))
return reply
def uniqueString(self):
"""Generate a unique string, unique for this TestBase instance"""
- # TODO aconway 2006-09-20: Not thread safe.
if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
return "Test Message " + str(self.uniqueCounter)
@@ -208,22 +202,24 @@ class TestBase(unittest.TestCase):
self.fail("Queue is not empty.")
except Queue.Empty: None # Ignore
- def assertPublishGet(self, queue, exchange="", routing_key=""):
+ def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
"""
Publish to exchange and assert queue.get() returns the same message.
"""
body = self.uniqueString()
self.channel.basic_publish(exchange=exchange,
- content=qpid.content.Content(body),
+ content=Content(body, properties=properties),
routing_key=routing_key)
- self.assertEqual(body, queue.get(timeout=2).content.body)
+ msg = queue.get(timeout=1)
+ self.assertEqual(body, msg.content.body)
+ if (properties): self.assertEqual(properties, msg.content.properties)
- def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+ def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
Publish a message and consume it, assert it comes back intact.
Return the Queue object used to consume.
"""
- self.assertPublishGet(self.consume(queue), exchange, routing_key)
+ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
def assertChannelException(self, expectedCode, message):
self.assertEqual(message.method.klass.name, "channel")
diff --git a/python/tests/exchange.py b/python/tests/exchange.py
index 4eb64520e6..8f3504b15e 100644
--- a/python/tests/exchange.py
+++ b/python/tests/exchange.py
@@ -20,22 +20,11 @@ Tests for exchange behaviour.
Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
"""
-import logging, Queue
+import Queue, logging
from qpid.testlib import TestBase
from qpid.content import Content
-# TODO aconway 2006-09-01: Investigate and add tests as appropriate.
-# Observered on C++:
-#
-# No exception raised for basic_consume on non-existent queue name.
-# No exception for basic_publish with bad routing key.
-# No exception for binding to non-existent exchange?
-# queue_bind hangs with invalid exchange name
-#
-# Do server exceptions get propagated properly?
-# Do Java exceptions propagate with any data (or just Closed())
-
class StandardExchangeVerifier:
"""Verifies standard exchange behavior.
@@ -67,7 +56,6 @@ class StandardExchangeVerifier:
self.assertPublishGet(q, ex, "a.b.x")
self.assertPublishGet(q, ex, "a.x.b.x")
self.assertPublishGet(q, ex, "a.x.x.b.x")
-
# Shouldn't match
self.channel.basic_publish(exchange=ex, routing_key="a.b")
self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")
@@ -75,6 +63,16 @@ class StandardExchangeVerifier:
self.channel.basic_publish(exchange=ex, routing_key="a.b")
self.assert_(q.empty())
+ def verifyHeadersExchange(self, ex):
+ """Verify that ex is a headers exchange"""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+ q = self.consume("q")
+ headers = {"name":"fred", "age":3}
+ self.assertPublishGet(q, exchange=ex, properties={'headers':headers})
+ self.channel.basic_publish(exchange=ex) # No headers, won't deliver
+ self.assertEmpty(q);
+
class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""
@@ -97,6 +95,11 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""Declare and test a topic exchange"""
self.exchange_declare(0, exchange="t", type="topic")
self.verifyTopicExchange("t")
+
+ def testHeaders(self):
+ """Declare and test a headers exchange"""
+ self.exchange_declare(0, exchange="h", type="headers")
+ self.verifyHeadersExchange("h")
class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
@@ -106,7 +109,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
exchange instance is amq. followed by the exchange type name.
Client creates a temporary queue and attempts to bind to each required
- exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if
+ exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
those types are defined).
"""
def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
@@ -115,9 +118,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
- def testAmqHeaders(self):
- self.exchange_declare(0, exchange="amq.headers", passive="true")
- # TODO aconway 2006-09-14: verify headers behavior
+ def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
"""
@@ -137,13 +138,14 @@ class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
self.verifyDirectExchange("")
+# TODO aconway 2006-09-27: Fill in empty tests:
+
class DefaultAccessRuleTests(TestBase):
"""
The server MUST NOT allow clients to access the default exchange except
by specifying an empty exchange name in the Queue.Bind and content Publish
methods.
"""
- # TODO aconway 2006-09-18: fill this in.
class ExtensionsRuleTests(TestBase):
"""
@@ -252,3 +254,41 @@ class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
"""
+class HeadersExchangeTests(TestBase):
+ """
+ Tests for headers exchange functionality.
+ """
+ def setUp(self):
+ TestBase.setUp(self)
+ self.queue_declare(queue="q")
+ self.q = self.consume("q")
+
+ def myAssertPublishGet(self, headers):
+ self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers})
+
+ def myBasicPublish(self, headers):
+ self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers}))
+
+ def testMatchAll(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+
+ # None of these should match
+ self.myBasicPublish({})
+ self.myBasicPublish({"name":"barney"})
+ self.myBasicPublish({"name":10})
+ self.myBasicPublish({"name":"fred", "age":2})
+ self.assertEmpty(self.q)
+
+ def testMatchAny(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred"})
+ self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+ self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+ # Wont match
+ self.myBasicPublish({})
+ self.myBasicPublish({"irrelevant":0})
+ self.assertEmpty(self.q)
+
diff --git a/python/tests/testlib.py b/python/tests/testlib.py
index a50f8140b4..6a2efb6a11 100644
--- a/python/tests/testlib.py
+++ b/python/tests/testlib.py
@@ -52,3 +52,12 @@ class TestBaseTest(TestBase):
self.fail("assertEmpty did not assert on non-empty queue")
except AssertionError: None # Ignore
+ def testMessageProperties(self):
+ """Verify properties are passed with message"""
+ props={"headers":{"x":1, "y":2}}
+ self.queue_declare(queue="q")
+ q = self.consume("q")
+ self.assertPublishGet(q, routing_key="q", properties=props)
+
+
+