diff options
author | Alan Conway <aconway@apache.org> | 2006-09-27 19:50:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-09-27 19:50:23 +0000 |
commit | caca23c5dc055d985fecfe188573104bc707ad9d (patch) | |
tree | 154c0bbd4c7bca70080de28116b5654491657906 | |
parent | 9d718c2348708b0b27ce9fb9fcbf05c4b0a997cc (diff) | |
download | qpid-python-caca23c5dc055d985fecfe188573104bc707ad9d.tar.gz |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450556 13f79535-47bb-0310-9956-ffa450edef68
24 files changed, 573 insertions, 361 deletions
diff --git a/cpp/Makefile b/cpp/Makefile index 9abdf40f14..28f2212a7a 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -22,16 +22,20 @@ UNITTESTS=$(wildcard common/*/test/*.so broker/test/*.so) -.PHONY: all clean doxygen +.PHONY: all test unittest pythontest runtests clean doxygen -test: all - @$(MAKE) runtests +test: all runtests -runtests: +unittest: DllPlugInTester -c -b $(UNITTESTS) + +pythontest: bin/qpidd >> qpidd.log & cd ../python ; ./run-tests -v -I cpp_failing.txt +runtests: + $(MAKE) -k unittest pythontest + all: @$(MAKE) -C common all @$(MAKE) -C broker all diff --git a/cpp/broker/src/HeadersExchange.cpp b/cpp/broker/src/HeadersExchange.cpp new file mode 100644 index 0000000000..03a029ea4d --- /dev/null +++ b/cpp/broker/src/HeadersExchange.cpp @@ -0,0 +1,119 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "HeadersExchange.h" +#include "ExchangeBinding.h" +#include "Value.h" +#include <algorithm> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +// TODO aconway 2006-09-20: More efficient matching algorithm. +// The current search algorithm really sucks. +// Fieldtables are heavy, maybe use shared_ptr to do handle-body. + +namespace qpid { +namespace broker { + +namespace { +const std::string all("all"); +const std::string any("any"); +const std::string x_match("x-match"); +} + +HeadersExchange::HeadersExchange(const string& name) : Exchange(name) { } + +void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + std::cout << "HeadersExchange::bind" << std::endl; + Locker locker(lock); + std::string what = args->getString("x-match"); + // TODO aconway 2006-09-26: throw an exception for invalid bindings. + if (what != all && what != any) return; // Invalid. + bindings.push_back(Binding(*args, queue)); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); +} + +void HeadersExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + Locker locker(lock);; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (i->first == *args) { + bindings.erase(i); + } + } +} + + +void HeadersExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ + std::cout << "route: " << *args << std::endl; + Locker locker(lock);; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, *args)) i->second->deliver(msg); + } +} + +HeadersExchange::~HeadersExchange() {} + +const std::string HeadersExchange::typeName("headers"); +namespace +{ + +bool match_values(const Value& bind, const Value& msg) { + return dynamic_cast<const EmptyValue*>(&bind) || bind == msg; +} + +} + + +bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { + typedef FieldTable::ValueMap Map; + std::string what = bind.getString(x_match); + if (what == all) { + for (Map::const_iterator i = bind.getMap().begin(); + i != bind.getMap().end(); + ++i) + { + if (i->first != x_match) + { + Map::const_iterator j = msg.getMap().find(i->first); + if (j == msg.getMap().end()) return false; + if (!match_values(*(i->second), *(j->second))) return false; + } + } + return true; + } else if (what == any) { + for (Map::const_iterator i = bind.getMap().begin(); + i != bind.getMap().end(); + ++i) + { + if (i->first != x_match) + { + Map::const_iterator j = msg.getMap().find(i->first); + if (j != msg.getMap().end()) { + if (match_values(*(i->second), *(j->second))) return true; + } + } + } + return false; + } else { + return false; + } +} + +}} + diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp index 280e89c475..39c627afef 100644 --- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp +++ b/cpp/broker/src/SessionHandlerFactoryImpl.cpp @@ -18,6 +18,7 @@ #include "SessionHandlerFactoryImpl.h" #include "SessionHandlerImpl.h" #include "FanOutExchange.h" +#include "HeadersExchange.h" using namespace qpid::broker; using namespace qpid::io; @@ -28,6 +29,7 @@ const std::string empty; const std::string amq_direct("amq.direct"); const std::string amq_topic("amq.topic"); const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); } SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){ @@ -35,6 +37,7 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeo exchanges.declare(new DirectExchange(amq_direct)); exchanges.declare(new TopicExchange(amq_topic)); exchanges.declare(new FanOutExchange(amq_fanout)); + exchanges.declare(new HeadersExchange(amq_match)); cleaner.start(); } diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 2ce1c4b298..eb8f37030c 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -18,6 +18,8 @@ #include <iostream> #include "SessionHandlerImpl.h" #include "FanOutExchange.h" +#include "TopicExchange.h" +#include "HeadersExchange.h" #include "assert.h" using namespace std::tr1; @@ -223,7 +225,9 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 if(!passive && ( type != TopicExchange::typeName && type != DirectExchange::typeName && - type != FanOutExchange::typeName) + type != FanOutExchange::typeName && + type != HeadersExchange::typeName + ) ) { throw ChannelException(540, "Exchange type not implemented: " + type); @@ -237,6 +241,8 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 parent->exchanges->declare(new DirectExchange(exchange)); }else if(type == FanOutExchange::typeName){ parent->exchanges->declare(new DirectExchange(exchange)); + }else if (type == HeadersExchange::typeName) { + parent->exchanges->declare(new HeadersExchange(exchange)); } } parent->exchanges->getLock()->release(); diff --git a/cpp/common/framing/inc/AMQBody.h b/cpp/common/framing/inc/AMQBody.h index d4b436c949..a1770820a9 100644 --- a/cpp/common/framing/inc/AMQBody.h +++ b/cpp/common/framing/inc/AMQBody.h @@ -27,18 +27,20 @@ namespace qpid { class AMQBody { - public: + public: typedef std::tr1::shared_ptr<AMQBody> shared_ptr; + virtual ~AMQBody(); virtual u_int32_t size() const = 0; virtual u_int8_t type() const = 0; virtual void encode(Buffer& buffer) const = 0; virtual void decode(Buffer& buffer, u_int32_t size) = 0; - inline virtual ~AMQBody(){} + virtual void print(std::ostream& out) const; }; - enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8}; + std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; + enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8}; } } diff --git a/cpp/common/framing/inc/AMQContentBody.h b/cpp/common/framing/inc/AMQContentBody.h index 8e97c31edb..1a6f2cf117 100644 --- a/cpp/common/framing/inc/AMQContentBody.h +++ b/cpp/common/framing/inc/AMQContentBody.h @@ -40,6 +40,7 @@ public: u_int32_t size() const; void encode(Buffer& buffer) const; void decode(Buffer& buffer, u_int32_t size); + void print(std::ostream& out) const; }; } diff --git a/cpp/common/framing/inc/AMQHeaderBody.h b/cpp/common/framing/inc/AMQHeaderBody.h index 369db8a9c8..e39fffa8ce 100644 --- a/cpp/common/framing/inc/AMQHeaderBody.h +++ b/cpp/common/framing/inc/AMQHeaderBody.h @@ -40,12 +40,14 @@ public: AMQHeaderBody(); inline u_int8_t type() const { return HEADER_BODY; } HeaderProperties* getProperties(){ return properties; } + const HeaderProperties* getProperties() const { return properties; } inline u_int64_t getContentSize() const { return contentSize; } inline void setContentSize(u_int64_t size) { contentSize = size; } virtual ~AMQHeaderBody(); virtual u_int32_t size() const; virtual void encode(Buffer& buffer) const; virtual void decode(Buffer& buffer, u_int32_t size); + virtual void print(std::ostream& out) const; }; } diff --git a/cpp/common/framing/inc/AMQHeartbeatBody.h b/cpp/common/framing/inc/AMQHeartbeatBody.h index ca2def977a..cfe057bdcd 100644 --- a/cpp/common/framing/inc/AMQHeartbeatBody.h +++ b/cpp/common/framing/inc/AMQHeartbeatBody.h @@ -30,11 +30,12 @@ class AMQHeartbeatBody : virtual public AMQBody public: typedef std::tr1::shared_ptr<AMQHeartbeatBody> shared_ptr; - virtual ~AMQHeartbeatBody() {} + virtual ~AMQHeartbeatBody(); inline u_int32_t size() const { return 0; } inline u_int8_t type() const { return HEARTBEAT_BODY; } inline void encode(Buffer& buffer) const {} inline void decode(Buffer& buffer, u_int32_t size) {} + virtual void print(std::ostream& out) const; }; } diff --git a/cpp/common/framing/inc/Buffer.h b/cpp/common/framing/inc/Buffer.h index 1ff4611f1f..e0532cc9d6 100644 --- a/cpp/common/framing/inc/Buffer.h +++ b/cpp/common/framing/inc/Buffer.h @@ -16,7 +16,6 @@ * */ #include "amqp_types.h" -#include "FieldTable.h" #ifndef _Buffer_ #define _Buffer_ @@ -24,6 +23,8 @@ namespace qpid { namespace framing { +class FieldTable; + class Buffer { const int size; diff --git a/cpp/common/framing/inc/FieldTable.h b/cpp/common/framing/inc/FieldTable.h index cf935d3284..7da914968e 100644 --- a/cpp/common/framing/inc/FieldTable.h +++ b/cpp/common/framing/inc/FieldTable.h @@ -17,6 +17,8 @@ */ #include <iostream> #include <vector> +#include <tr1/memory> +#include <tr1/unordered_map> #include "amqp_types.h" #ifndef _FieldTable_ @@ -25,42 +27,50 @@ namespace qpid { namespace framing { - class NamedValue; - class Value; - class Buffer; +class Value; +class Buffer; - class FieldTable - { - std::vector<NamedValue*> values; - NamedValue* find(const std::string& name) const; +class FieldTable +{ + public: + typedef std::tr1::shared_ptr<Value> ValuePtr; + typedef std::tr1::unordered_map<std::string, ValuePtr> ValueMap; - Value* getValue(const std::string& name) const; - void setValue(const std::string& name, Value* value); + ~FieldTable(); + u_int32_t size() const; + int count() const; + void setString(const std::string& name, const std::string& value); + void setInt(const std::string& name, int value); + void setTimestamp(const std::string& name, u_int64_t value); + void setTable(const std::string& name, const FieldTable& value); + //void setDecimal(string& name, xxx& value); + std::string getString(const std::string& name) const; + int getInt(const std::string& name) const; + u_int64_t getTimestamp(const std::string& name) const; + void getTable(const std::string& name, FieldTable& value) const; + //void getDecimal(string& name, xxx& value); + void erase(const std::string& name); + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); - public: - ~FieldTable(); - u_int32_t size() const; - int count() const; - void setString(const std::string& name, const std::string& value); - void setInt(const std::string& name, int value); - void setTimestamp(const std::string& name, u_int64_t value); - void setTable(const std::string& name, const FieldTable& value); - //void setDecimal(string& name, xxx& value); - std::string getString(const std::string& name); - int getInt(const std::string& name); - u_int64_t getTimestamp(const std::string& name); - void getTable(const std::string& name, FieldTable& value); - //void getDecimal(string& name, xxx& value); + bool operator==(const FieldTable& other) const; - void encode(Buffer& buffer) const; - void decode(Buffer& buffer); + // TODO aconway 2006-09-26: Yeuch! Rework FieldTable to have + // a map-like interface. + const ValueMap& getMap() const { return values; } + ValueMap& getMap() { return values; } + + + private: + friend std::ostream& operator<<(std::ostream& out, const FieldTable& body); + ValueMap values; + template<class T> T getValue(const std::string& name) const; +}; - friend std::ostream& operator<<(std::ostream& out, const FieldTable& body); - }; - - class FieldNotFoundException{}; - class UnknownFieldName : public FieldNotFoundException{}; - class IncorrectFieldType : public FieldNotFoundException{}; +class FieldNotFoundException{}; +class UnknownFieldName : public FieldNotFoundException{}; +class IncorrectFieldType : public FieldNotFoundException{}; } } diff --git a/cpp/common/framing/inc/NamedValue.h b/cpp/common/framing/inc/NamedValue.h deleted file mode 100644 index 729b5d08a7..0000000000 --- a/cpp/common/framing/inc/NamedValue.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include <vector> -#include "amqp_types.h" -#include "Value.h" - -#ifndef _NamedValue_ -#define _NamedValue_ - -namespace qpid { -namespace framing { - - class Buffer; - - class NamedValue{ - string name; - Value* value; - public: - NamedValue(); - NamedValue(const string& name, Value* value); - ~NamedValue(); - void encode(Buffer& buffer); - void decode(Buffer& buffer); - u_int32_t size() const; - inline const string& getName() const { return name; } - inline Value* getValue() const { return value; } - inline void setValue(Value* val) { value = val; } - }; -} -} - - -#endif diff --git a/cpp/common/framing/inc/Value.h b/cpp/common/framing/inc/Value.h index e3d2a2c1d6..3d525a0bef 100644 --- a/cpp/common/framing/inc/Value.h +++ b/cpp/common/framing/inc/Value.h @@ -26,84 +26,135 @@ namespace qpid { namespace framing { - class Buffer; - - class Value{ - public: - inline virtual ~Value(){} - virtual u_int32_t size() const = 0; - virtual char getType() const = 0; - virtual void encode(Buffer& buffer) = 0; - virtual void decode(Buffer& buffer) = 0; - }; - - class StringValue : public virtual Value{ - string value; - - public: - inline StringValue(const string& v) : value(v){} - inline StringValue(){} - inline string getValue(){ return value; } - ~StringValue(){} - inline virtual u_int32_t size() const { return 4 + value.length(); } - inline virtual char getType() const { return 'S'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); - }; - - class IntegerValue : public virtual Value{ - int value; - public: - inline IntegerValue(int v) : value(v){} - inline IntegerValue(){} - inline int getValue(){ return value; } - ~IntegerValue(){} - inline virtual u_int32_t size() const { return 4; } - inline virtual char getType() const { return 'I'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); - }; - - class TimeValue : public virtual Value{ - u_int64_t value; - public: - inline TimeValue(int v) : value(v){} - inline TimeValue(){} - inline u_int64_t getValue(){ return value; } - ~TimeValue(){} - inline virtual u_int32_t size() const { return 8; } - inline virtual char getType() const { return 'T'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); - }; - - class DecimalValue : public virtual Value{ - u_int8_t decimals; - u_int32_t value; - public: - inline DecimalValue(int v) : value(v){} - inline DecimalValue(){} - ~DecimalValue(){} - inline virtual u_int32_t size() const { return 5; } - inline virtual char getType() const { return 'D'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); - }; - - class FieldTableValue : public virtual Value{ - FieldTable value; - public: - inline FieldTableValue(const FieldTable& v) : value(v){} - inline FieldTableValue(){} - inline FieldTable getValue(){ return value; } - ~FieldTableValue(){} - inline virtual u_int32_t size() const { return 4 + value.size(); } - inline virtual char getType() const { return 'F'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); - }; -} -} +class Buffer; +/** + * Represents a decimal value. + * No arithmetic functionality for now, we only care about encoding/decoding. + */ +struct Decimal { + u_int32_t value; + u_int8_t decimals; + + Decimal(u_int32_t value_=0, u_int8_t decimals_=0) : value(value_), decimals(decimals_) {} + bool operator==(const Decimal& d) const { + return decimals == d.decimals && value == d.value; + } + bool operator!=(const Decimal& d) const { return !(*this == d); } +}; + +std::ostream& operator<<(std::ostream& out, const Decimal& d); + +/** + * Polymorpic base class for values. + */ +class Value { + public: + virtual ~Value(); + virtual u_int32_t size() const = 0; + virtual char getType() const = 0; + virtual void encode(Buffer& buffer) = 0; + virtual void decode(Buffer& buffer) = 0; + virtual bool operator==(const Value&) const = 0; + bool operator!=(const Value& v) const { return !(*this == v); } + virtual void print(std::ostream& out) const = 0; + + /** Create a new value by decoding from the buffer */ + static std::auto_ptr<Value> decode_value(Buffer& buffer); +}; + +std::ostream& operator<<(std::ostream& out, const Value& d); + + +/** + * Template for common operations on Value sub-classes. + */ +template <class T> +class ValueOps : public Value +{ + protected: + T value; + public: + ValueOps() {} + ValueOps(const T& v) : value(v) {} + const T& getValue() const { return value; } + T& getValue() { return value; } + + virtual bool operator==(const Value& v) const { + const ValueOps<T>* vo = dynamic_cast<const ValueOps<T>*>(&v); + if (vo == 0) return false; + else return value == vo->value; + } + + void print(std::ostream& out) const { out << value; } +}; + + +class StringValue : public ValueOps<std::string> { + public: + StringValue(const std::string& v) : ValueOps<std::string>(v) {} + StringValue() {} + virtual u_int32_t size() const { return 4 + value.length(); } + virtual char getType() const { return 'S'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); +}; + +class IntegerValue : public ValueOps<int> { + public: + IntegerValue(int v) : ValueOps<int>(v) {} + IntegerValue(){} + virtual u_int32_t size() const { return 4; } + virtual char getType() const { return 'I'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); +}; + +class TimeValue : public ValueOps<u_int64_t> { + public: + TimeValue(u_int64_t v) : ValueOps<u_int64_t>(v){} + TimeValue(){} + virtual u_int32_t size() const { return 8; } + virtual char getType() const { return 'T'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); +}; + +class DecimalValue : public ValueOps<Decimal> { + public: + DecimalValue(const Decimal& d) : ValueOps<Decimal>(d) {} + DecimalValue(u_int32_t value_=0, u_int8_t decimals_=0) : + ValueOps<Decimal>(Decimal(value_, decimals_)){} + virtual u_int32_t size() const { return 5; } + virtual char getType() const { return 'D'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); +}; + + +class FieldTableValue : public ValueOps<FieldTable> { + public: + FieldTableValue(const FieldTable& v) : ValueOps<FieldTable>(v){} + FieldTableValue(){} + virtual u_int32_t size() const { return 4 + value.size(); } + virtual char getType() const { return 'F'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); +}; + +class EmptyValue : public Value { + public: + ~EmptyValue(); + virtual u_int32_t size() const { return 0; } + virtual char getType() const { return 0; } + virtual void encode(Buffer& buffer) {} + virtual void decode(Buffer& buffer) {} + virtual bool operator==(const Value& v) const { + return dynamic_cast<const EmptyValue*>(&v); + } + virtual void print(std::ostream& out) const; +}; + +}} // qpid::framing #endif diff --git a/cpp/common/framing/src/AMQContentBody.cpp b/cpp/common/framing/src/AMQContentBody.cpp index c8aadc8108..a9ee190ba8 100644 --- a/cpp/common/framing/src/AMQContentBody.cpp +++ b/cpp/common/framing/src/AMQContentBody.cpp @@ -16,6 +16,7 @@ * */ #include "AMQContentBody.h" +#include <iostream> qpid::framing::AMQContentBody::AMQContentBody(){ } @@ -33,3 +34,7 @@ void qpid::framing::AMQContentBody::decode(Buffer& buffer, u_int32_t size){ buffer.getRawData(data, size); } +void qpid::framing::AMQContentBody::print(std::ostream& out) const +{ + out << "content (" << size() << " bytes)"; +} diff --git a/cpp/common/framing/src/AMQFrame.cpp b/cpp/common/framing/src/AMQFrame.cpp index 70f71010ff..5686c9ac81 100644 --- a/cpp/common/framing/src/AMQFrame.cpp +++ b/cpp/common/framing/src/AMQFrame.cpp @@ -1,3 +1,4 @@ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -126,21 +127,8 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t){ out << "Frame[channel=" << t.channel << "; "; - if(t.body.get() == 0){ - out << "empty"; - }else if(t.body->type() == METHOD_BODY){ - (dynamic_cast<AMQMethodBody*>(t.body.get()))->print(out); - }else if(t.body->type() == HEADER_BODY){ - out << "header, content_size=" << - (dynamic_cast<AMQHeaderBody*>(t.body.get()))->getContentSize() - << " (" << t.body->size() << " bytes)"; - }else if(t.body->type() == CONTENT_BODY){ - out << "content (" << t.body->size() << " bytes)"; - }else if(t.body->type() == HEARTBEAT_BODY){ - out << "heartbeat"; - }else{ - out << "unknown type, " << t.body->type(); - } + if (t.body.get() == 0) out << "empty"; + else out << *t.body; out << "]"; return out; } diff --git a/cpp/common/framing/src/AMQHeaderBody.cpp b/cpp/common/framing/src/AMQHeaderBody.cpp index 4bf1626a8a..1fd387c5d5 100644 --- a/cpp/common/framing/src/AMQHeaderBody.cpp +++ b/cpp/common/framing/src/AMQHeaderBody.cpp @@ -58,3 +58,16 @@ void qpid::framing::AMQHeaderBody::createProperties(int classId){ THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class"); } } + +void qpid::framing::AMQHeaderBody::print(std::ostream& out) const +{ + out << "header, content_size=" << getContentSize() + << " (" << size() << " bytes)" << ", headers=" ; + // TODO aconway 2006-09-26: Hack to see headers. + // Should write proper op << for BasicHeaderProperties. + // + const BasicHeaderProperties* props = + dynamic_cast<const BasicHeaderProperties*>(getProperties()); + // TODO aconway 2006-09-26: Lose the static cast, fix BasicHeaderProperties + if (props) out << const_cast<BasicHeaderProperties*>(props)->getHeaders(); +} diff --git a/cpp/common/framing/src/Buffer.cpp b/cpp/common/framing/src/Buffer.cpp index 5264491980..15a4485abd 100644 --- a/cpp/common/framing/src/Buffer.cpp +++ b/cpp/common/framing/src/Buffer.cpp @@ -16,6 +16,7 @@ * */ #include "Buffer.h" +#include "FieldTable.h" qpid::framing::Buffer::Buffer(int _size) : size(_size), position(0), limit(_size){ data = new char[size]; diff --git a/cpp/common/framing/src/FieldTable.cpp b/cpp/common/framing/src/FieldTable.cpp index 048cefa83c..b12b2783df 100644 --- a/cpp/common/framing/src/FieldTable.cpp +++ b/cpp/common/framing/src/FieldTable.cpp @@ -16,112 +16,133 @@ * */ #include "FieldTable.h" -#include "NamedValue.h" #include "QpidError.h" #include "Buffer.h" #include "Value.h" +#include <assert.h> -qpid::framing::FieldTable::~FieldTable(){ - int count(values.size()); - for(int i = 0; i < count; i++){ - delete values[i]; - } -} +namespace qpid { +namespace framing { -u_int32_t qpid::framing::FieldTable::size() const { +FieldTable::~FieldTable() {} + +u_int32_t FieldTable::size() const { u_int32_t size(4); - int count(values.size()); - for(int i = 0; i < count; i++){ - size += values[i]->size(); + for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { + // 2 = shortstr_len_byyte + type_char_byte + size += 2 + (i->first).size() + (i->second)->size(); } return size; } -int qpid::framing::FieldTable::count() const { +int FieldTable::count() const { return values.size(); } -std::ostream& qpid::framing::operator<<(std::ostream& out, const FieldTable& t){ - out << "field_table{}"; - return out; +namespace +{ +std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_type& i) { + return out << i.first << ":" << *i.second; +} } -void qpid::framing::FieldTable::setString(const std::string& name, const std::string& value){ - setValue(name, new StringValue(value)); +std::ostream& operator<<(std::ostream& out, const FieldTable& t) { + out << "field_table{"; + FieldTable::ValueMap::const_iterator i = t.getMap().begin(); + if (i != t.getMap().end()) out << *i++; + while (i != t.getMap().end()) + { + out << "," << *i++; + } + return out << "}"; } -void qpid::framing::FieldTable::setInt(const std::string& name, int value){ - setValue(name, new IntegerValue(value)); +void FieldTable::setString(const std::string& name, const std::string& value){ + values[name] = ValuePtr(new StringValue(value)); } -void qpid::framing::FieldTable::setTimestamp(const std::string& name, u_int64_t value){ - setValue(name, new TimeValue(value)); +void FieldTable::setInt(const std::string& name, int value){ + values[name] = ValuePtr(new IntegerValue(value)); } -void qpid::framing::FieldTable::setTable(const std::string& name, const FieldTable& value){ - setValue(name, new FieldTableValue(value)); +void FieldTable::setTimestamp(const std::string& name, u_int64_t value){ + values[name] = ValuePtr(new TimeValue(value)); } -std::string qpid::framing::FieldTable::getString(const std::string& name){ - StringValue* val = dynamic_cast<StringValue*>(getValue(name)); - return (val == 0 ? "" : val->getValue()); +void FieldTable::setTable(const std::string& name, const FieldTable& value){ + values[name] = ValuePtr(new FieldTableValue(value)); } -int qpid::framing::FieldTable::getInt(const std::string& name){ - IntegerValue* val = dynamic_cast<IntegerValue*>(getValue(name)); - return (val == 0 ? 0 : val->getValue()); +namespace { +// TODO aconway 2006-09-26: This is messy. Revisit the field table +// and Value classes with a traits-based approach. +// +template <class T> T default_value() { return T(); } +template <> int default_value<int>() { return 0; } +template <> u_int64_t default_value<u_int64_t>() { return 0; } } -u_int64_t qpid::framing::FieldTable::getTimestamp(const std::string& name){ - TimeValue* val = dynamic_cast<TimeValue*>(getValue(name)); - return (val == 0 ? 0 : val->getValue()); +template <class T> +T FieldTable::getValue(const std::string& name) const +{ + ValueMap::const_iterator i = values.find(name); + if (i == values.end()) return default_value<T>(); + const ValueOps<T> *vt = dynamic_cast<const ValueOps<T>*>(i->second.get()); + return vt->getValue(); } -void qpid::framing::FieldTable::getTable(const std::string& name, FieldTable& value){ - FieldTableValue* val = dynamic_cast<FieldTableValue*>(getValue(name)); - if(val != 0) value = val->getValue(); +std::string FieldTable::getString(const std::string& name) const { + return getValue<std::string>(name); } -qpid::framing::NamedValue* qpid::framing::FieldTable::find(const std::string& name) const{ - int count(values.size()); - for(int i = 0; i < count; i++){ - if(values[i]->getName() == name) return values[i]; - } - return 0; +int FieldTable::getInt(const std::string& name) const { + return getValue<int>(name); } -qpid::framing::Value* qpid::framing::FieldTable::getValue(const std::string& name) const{ - NamedValue* val = find(name); - return val == 0 ? 0 : val->getValue(); +u_int64_t FieldTable::getTimestamp(const std::string& name) const { + return getValue<u_int64_t>(name); } -void qpid::framing::FieldTable::setValue(const std::string& name, Value* value){ - NamedValue* val = find(name); - if(val == 0){ - val = new NamedValue(name, value); - values.push_back(val); - }else{ - Value* old = val->getValue(); - if(old != 0) delete old; - val->setValue(value); - } +void FieldTable::getTable(const std::string& name, FieldTable& value) const { + value = getValue<FieldTable>(name); } -void qpid::framing::FieldTable::encode(Buffer& buffer) const{ +void FieldTable::encode(Buffer& buffer) const{ buffer.putLong(size() - 4); - int count(values.size()); - for(int i = 0; i < count; i++){ - values[i]->encode(buffer); + for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) { + buffer.putShortString(i->first); + buffer.putOctet(i->second->getType()); + i->second->encode(buffer); } } -void qpid::framing::FieldTable::decode(Buffer& buffer){ +void FieldTable::decode(Buffer& buffer){ u_int32_t size = buffer.getLong(); int leftover = buffer.available() - size; while(buffer.available() > leftover){ - NamedValue* value = new NamedValue(); - value->decode(buffer); - values.push_back(value); + std::string name; + buffer.getShortString(name); + std::auto_ptr<Value> value(Value::decode_value(buffer)); + values[name] = ValuePtr(value.release()); } } + + +bool FieldTable::operator==(const FieldTable& x) const { + if (values.size() != x.values.size()) return false; + for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { + ValueMap::const_iterator j = x.values.find(i->first); + if (j == x.values.end()) return false; + if (*(i->second) != *(j->second)) return false; + } + return true; +} + +void FieldTable::erase(const std::string& name) +{ + values.erase(values.find(name)); +} + +} +} diff --git a/cpp/common/framing/src/NamedValue.cpp b/cpp/common/framing/src/NamedValue.cpp deleted file mode 100644 index e80aea433c..0000000000 --- a/cpp/common/framing/src/NamedValue.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "NamedValue.h" -#include "QpidError.h" -#include "Buffer.h" -#include "FieldTable.h" - -qpid::framing::NamedValue::NamedValue() : value(0){} - -qpid::framing::NamedValue::NamedValue(const string& n, Value* v) : name(n), value(v){} - -qpid::framing::NamedValue::~NamedValue(){ - if(value != 0){ - delete value; - } -} - -u_int32_t qpid::framing::NamedValue::size() const{ - return value ? 1/*size of name*/ + name.length() + 1/*type char*/ + value->size() : 0; -} - -void qpid::framing::NamedValue::encode(Buffer& buffer){ - buffer.putShortString(name); - u_int8_t type = value->getType(); - buffer.putOctet(type); - value->encode(buffer); -} - -void qpid::framing::NamedValue::decode(Buffer& buffer){ - buffer.getShortString(name); - u_int8_t type = buffer.getOctet(); - switch(type){ - case 'S': - value = new StringValue(); - break; - case 'I': - value = new IntegerValue(); - break; - case 'D': - value = new DecimalValue(); - break; - case 'T': - value = new TimeValue(); - break; - case 'F': - value = new FieldTableValue(); - break; - default: - THROW_QPID_ERROR(FRAMING_ERROR, "Unknown field table value type"); - } - value->decode(buffer); -} diff --git a/cpp/common/framing/src/Value.cpp b/cpp/common/framing/src/Value.cpp index 240b086696..1c210fdb12 100644 --- a/cpp/common/framing/src/Value.cpp +++ b/cpp/common/framing/src/Value.cpp @@ -18,40 +18,94 @@ #include "Value.h" #include "Buffer.h" #include "FieldTable.h" +#include "QpidError.h" -void qpid::framing::StringValue::encode(Buffer& buffer){ +namespace qpid { +namespace framing { + +Value::~Value() {} + +void StringValue::encode(Buffer& buffer){ buffer.putLongString(value); } -void qpid::framing::StringValue::decode(Buffer& buffer){ +void StringValue::decode(Buffer& buffer){ buffer.getLongString(value); } -void qpid::framing::IntegerValue::encode(Buffer& buffer){ +void IntegerValue::encode(Buffer& buffer){ buffer.putLong((u_int32_t) value); } -void qpid::framing::IntegerValue::decode(Buffer& buffer){ +void IntegerValue::decode(Buffer& buffer){ value = buffer.getLong(); } -void qpid::framing::TimeValue::encode(Buffer& buffer){ +void TimeValue::encode(Buffer& buffer){ buffer.putLongLong(value); } -void qpid::framing::TimeValue::decode(Buffer& buffer){ +void TimeValue::decode(Buffer& buffer){ value = buffer.getLongLong(); } -void qpid::framing::DecimalValue::encode(Buffer& buffer){ - buffer.putOctet(decimals); - buffer.putLong(value); +void DecimalValue::encode(Buffer& buffer){ + buffer.putOctet(value.decimals); + buffer.putLong(value.value); } -void qpid::framing::DecimalValue::decode(Buffer& buffer){ - decimals = buffer.getOctet(); - value = buffer.getLong(); +void DecimalValue::decode(Buffer& buffer){ + value = Decimal(buffer.getLong(), buffer.getOctet()); } -void qpid::framing::FieldTableValue::encode(Buffer& buffer){ +void FieldTableValue::encode(Buffer& buffer){ buffer.putFieldTable(value); } -void qpid::framing::FieldTableValue::decode(Buffer& buffer){ +void FieldTableValue::decode(Buffer& buffer){ buffer.getFieldTable(value); } + +std::auto_ptr<Value> Value::decode_value(Buffer& buffer) +{ + std::auto_ptr<Value> value; + u_int8_t type = buffer.getOctet(); + switch(type){ + case 'S': + value.reset(new StringValue()); + break; + case 'I': + value.reset(new IntegerValue()); + break; + case 'D': + value.reset(new DecimalValue()); + break; + case 'T': + value.reset(new TimeValue()); + break; + case 'F': + value.reset(new FieldTableValue()); + break; + default: + THROW_QPID_ERROR(FRAMING_ERROR, "Unknown field table value type"); + } + value->decode(buffer); + return value; +} + +EmptyValue::~EmptyValue() {} + +void EmptyValue::print(std::ostream& out) const +{ + out << "<empty field value>"; +} + +std::ostream& operator<<(std::ostream& out, const Value& v) { + v.print(out); + return out; +} + +std::ostream& operator<<(std::ostream& out, const Decimal& d) +{ + return out << "Decimal(" << d.value << "," << d.decimals << ")"; +} + +}} + + + diff --git a/python/qpid/connection.py b/python/qpid/connection.py index f4d0817e60..fc6c147f2b 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -20,7 +20,7 @@ to read and write Frame objects. This could be used by a client, server, or even a proxy implementation. """ -import socket, codec +import socket, codec,logging from cStringIO import StringIO from spec import load, pythonize from codec import EOF @@ -240,8 +240,10 @@ class Header(Payload): properties = {} for b, f in zip(bits, klass.fields): if b: - properties[f.name] = c.decode(f.type) - + # Note: decode returns a unicode u'' string but only + # plain '' strings can be used as keywords so we need to + # stringify the names. + properties[str(f.name)] = c.decode(f.type) return Header(klass, weight, size, **properties) def __str__(self): diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 31d3d24f5f..3085e24247 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -146,7 +146,6 @@ class Channel: def invoke(self, method, args, content = None): if self.closed: raise Closed(self.reason) - frame = Frame(self.id, Method(method, *args)) self.outgoing.put(frame) @@ -181,7 +180,7 @@ class Channel: def write_content(self, klass, content, queue): size = content.size() - header = Frame(self.id, Header(klass, content.weight(), size)) + header = Frame(self.id, Header(klass, content.weight(), size, **content.properties)) queue.put(header) for child in content.children: self.write_content(klass, child, queue) diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 0bec6a8708..92925bea20 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -22,7 +22,7 @@ import sys, re, unittest, os, random, logging import qpid.client, qpid.spec import Queue from getopt import getopt, GetoptError - +from qpid.content import Content def findmodules(root): """Find potential python modules under directory root""" @@ -161,10 +161,6 @@ class TestBase(unittest.TestCase): self.channel.channel_open() def tearDown(self): - # TODO aconway 2006-09-05: Wrong behaviour here, we should - # close all open channels (checking for exceptions on the - # channesl) then open a channel to clean up qs and exs, - # finally close that channel. for ch, q in self.queues: ch.queue_delete(queue=q) for ch, ex in self.exchanges: @@ -186,13 +182,11 @@ class TestBase(unittest.TestCase): arguments={}): channel = channel or self.channel reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments) - # TODO aconway 2006-09-14: Don't add exchange on failure. self.exchanges.append((channel,exchange)) return reply def uniqueString(self): """Generate a unique string, unique for this TestBase instance""" - # TODO aconway 2006-09-20: Not thread safe. if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; return "Test Message " + str(self.uniqueCounter) @@ -208,22 +202,24 @@ class TestBase(unittest.TestCase): self.fail("Queue is not empty.") except Queue.Empty: None # Ignore - def assertPublishGet(self, queue, exchange="", routing_key=""): + def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): """ Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() self.channel.basic_publish(exchange=exchange, - content=qpid.content.Content(body), + content=Content(body, properties=properties), routing_key=routing_key) - self.assertEqual(body, queue.get(timeout=2).content.body) + msg = queue.get(timeout=1) + self.assertEqual(body, msg.content.body) + if (properties): self.assertEqual(properties, msg.content.properties) - def assertPublishConsume(self, queue="", exchange="", routing_key=""): + def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ Publish a message and consume it, assert it comes back intact. Return the Queue object used to consume. """ - self.assertPublishGet(self.consume(queue), exchange, routing_key) + self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) def assertChannelException(self, expectedCode, message): self.assertEqual(message.method.klass.name, "channel") diff --git a/python/tests/exchange.py b/python/tests/exchange.py index 4eb64520e6..8f3504b15e 100644 --- a/python/tests/exchange.py +++ b/python/tests/exchange.py @@ -20,22 +20,11 @@ Tests for exchange behaviour. Test classes ending in 'RuleTests' are derived from rules in amqp.xml. """ -import logging, Queue +import Queue, logging from qpid.testlib import TestBase from qpid.content import Content -# TODO aconway 2006-09-01: Investigate and add tests as appropriate. -# Observered on C++: -# -# No exception raised for basic_consume on non-existent queue name. -# No exception for basic_publish with bad routing key. -# No exception for binding to non-existent exchange? -# queue_bind hangs with invalid exchange name -# -# Do server exceptions get propagated properly? -# Do Java exceptions propagate with any data (or just Closed()) - class StandardExchangeVerifier: """Verifies standard exchange behavior. @@ -67,7 +56,6 @@ class StandardExchangeVerifier: self.assertPublishGet(q, ex, "a.b.x") self.assertPublishGet(q, ex, "a.x.b.x") self.assertPublishGet(q, ex, "a.x.x.b.x") - # Shouldn't match self.channel.basic_publish(exchange=ex, routing_key="a.b") self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") @@ -75,6 +63,16 @@ class StandardExchangeVerifier: self.channel.basic_publish(exchange=ex, routing_key="a.b") self.assert_(q.empty()) + def verifyHeadersExchange(self, ex): + """Verify that ex is a headers exchange""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) + q = self.consume("q") + headers = {"name":"fred", "age":3} + self.assertPublishGet(q, exchange=ex, properties={'headers':headers}) + self.channel.basic_publish(exchange=ex) # No headers, won't deliver + self.assertEmpty(q); + class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): """ @@ -97,6 +95,11 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): """Declare and test a topic exchange""" self.exchange_declare(0, exchange="t", type="topic") self.verifyTopicExchange("t") + + def testHeaders(self): + """Declare and test a headers exchange""" + self.exchange_declare(0, exchange="h", type="headers") + self.verifyHeadersExchange("h") class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): @@ -106,7 +109,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): exchange instance is amq. followed by the exchange type name. Client creates a temporary queue and attempts to bind to each required - exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if + exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if those types are defined). """ def testAmqDirect(self): self.verifyDirectExchange("amq.direct") @@ -115,9 +118,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): def testAmqTopic(self): self.verifyTopicExchange("amq.topic") - def testAmqHeaders(self): - self.exchange_declare(0, exchange="amq.headers", passive="true") - # TODO aconway 2006-09-14: verify headers behavior + def testAmqMatch(self): self.verifyHeadersExchange("amq.match") class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): """ @@ -137,13 +138,14 @@ class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): self.verifyDirectExchange("") +# TODO aconway 2006-09-27: Fill in empty tests: + class DefaultAccessRuleTests(TestBase): """ The server MUST NOT allow clients to access the default exchange except by specifying an empty exchange name in the Queue.Bind and content Publish methods. """ - # TODO aconway 2006-09-18: fill this in. class ExtensionsRuleTests(TestBase): """ @@ -252,3 +254,41 @@ class DeleteMethodExchangeFieldExistsRuleTests(TestBase): """ +class HeadersExchangeTests(TestBase): + """ + Tests for headers exchange functionality. + """ + def setUp(self): + TestBase.setUp(self) + self.queue_declare(queue="q") + self.q = self.consume("q") + + def myAssertPublishGet(self, headers): + self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers}) + + def myBasicPublish(self, headers): + self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers})) + + def testMatchAll(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) + + # None of these should match + self.myBasicPublish({}) + self.myBasicPublish({"name":"barney"}) + self.myBasicPublish({"name":10}) + self.myBasicPublish({"name":"fred", "age":2}) + self.assertEmpty(self.q) + + def testMatchAny(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred"}) + self.myAssertPublishGet({"name":"fred", "ignoreme":10}) + self.myAssertPublishGet({"ignoreme":10, "age":3}) + + # Wont match + self.myBasicPublish({}) + self.myBasicPublish({"irrelevant":0}) + self.assertEmpty(self.q) + diff --git a/python/tests/testlib.py b/python/tests/testlib.py index a50f8140b4..6a2efb6a11 100644 --- a/python/tests/testlib.py +++ b/python/tests/testlib.py @@ -52,3 +52,12 @@ class TestBaseTest(TestBase): self.fail("assertEmpty did not assert on non-empty queue") except AssertionError: None # Ignore + def testMessageProperties(self): + """Verify properties are passed with message""" + props={"headers":{"x":1, "y":2}} + self.queue_declare(queue="q") + q = self.consume("q") + self.assertPublishGet(q, routing_key="q", properties=props) + + + |