diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.cpp | 74 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/ReceiverContext.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SenderContext.cpp | 29 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SenderContext.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SessionContext.cpp | 4 |
7 files changed, 139 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index ae5660448c..5dd6ace943 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -22,6 +22,8 @@ #include "Outgoing.h" #include "Message.h" #include "ManagedConnection.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" @@ -104,8 +106,76 @@ void Session::attach(pn_link_t* link) QueueSettings settings(false, true); //TODO: populate settings from source details when available from engine queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; - //TODO: bind based on filter when that is exposed by engine - if (exchange->getType() == FanOutExchange::typeName) { + pn_data_t* filter = pn_terminus_filter(source); + pn_data_next(filter); + if (filter && !pn_data_is_null(filter)) { + if (pn_data_type(filter) == PN_MAP) { + pn_data_t* echo = pn_terminus_filter(pn_link_source(link)); + pn_data_put_map(echo); + pn_data_enter(echo); + size_t count = pn_data_get_map(filter)/2; + QPID_LOG(debug, "Got filter map with " << count << " entries"); + pn_data_enter(filter); + for (size_t i = 0; i < count; i++) { + pn_bytes_t fname = pn_data_get_symbol(filter); + pn_data_next(filter); + bool isDescribed = pn_data_is_described(filter); + qpid::amqp::Descriptor descriptor(0); + if (isDescribed) { + pn_data_enter(filter); + pn_data_next(filter); + //TODO: use or at least verify descriptor + if (pn_data_type(filter) == PN_ULONG) { + descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(filter)); + } else if (pn_data_type(filter) == PN_SYMBOL) { + pn_bytes_t d = pn_data_get_symbol(filter); + qpid::amqp::CharSequence c; + c.data = d.start; + c.size = d.size; + descriptor = qpid::amqp::Descriptor(c); + } else { + QPID_LOG(notice, "Ignoring filter with descriptor with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter)); + continue; + } + QPID_LOG(debug, "Got filter with descriptor " << descriptor); + pn_data_next(filter); + } else { + QPID_LOG(debug, "Got undescribed filter of type " << pn_data_type(filter)); + } + if (pn_data_type(filter) == PN_STRING) { + pn_bytes_t value = pn_data_get_string(filter); + pn_data_next(filter); + exchange->bind(queue, std::string(value.start, value.size), 0); + pn_data_put_symbol(echo, fname); + if (isDescribed) { + pn_data_put_described(echo); + pn_data_enter(echo); + pn_bytes_t symbol; + switch (descriptor.type) { + case qpid::amqp::Descriptor::NUMERIC: + pn_data_put_ulong(echo, descriptor.value.code); + break; + case qpid::amqp::Descriptor::SYMBOLIC: + symbol.start = const_cast<char*>(descriptor.value.symbol.data); + symbol.size = descriptor.value.symbol.size; + pn_data_put_symbol(echo, symbol); + break; + } + } + pn_data_put_string(echo, value); + if (isDescribed) pn_data_exit(echo); + + QPID_LOG(debug, "Binding using filter " << std::string(fname.start, fname.size) << ":" << std::string(value.start, value.size)); + } else { + //TODO: handle headers exchange filters + QPID_LOG(warning, "Ignoring unsupported filter type with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter)); + } + } + pn_data_exit(echo); + } else { + QPID_LOG(warning, "Filter should be map, got type: " << pn_data_type(filter)); + } + } else if (exchange->getType() == FanOutExchange::typeName) { exchange->bind(queue, std::string(), 0); } else if (exchange->getType() == TopicExchange::typeName) { exchange->bind(queue, "#", 0); diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 499b1ae35d..2f1f39cb68 100644 --- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -262,10 +262,9 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - pn_terminus_t* source = pn_link_source((pn_link_t*) lnk->receiver); - pn_terminus_set_address(source, lnk->getSource().c_str()); - attach(ssn->session, (pn_link_t*) lnk->receiver, lnk->capacity); - if (!pn_link_remote_source((pn_link_t*) lnk->receiver)) { + lnk->configure(); + attach(ssn->session, lnk->receiver, lnk->capacity); + if (!pn_link_remote_source(lnk->receiver)) { std::string msg("No such source : "); msg += lnk->getSource(); throw qpid::messaging::NotFound(msg); diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 7e41e727cc..33ddabfc75 100644 --- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -29,10 +29,10 @@ namespace qpid { namespace messaging { namespace amqp { //TODO: proper conversion to wide string for address -ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const std::string& s) +ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) : name(n), - source(s), - receiver(pn_receiver(session, source.c_str())), + address(a), + receiver(pn_receiver(session, name.c_str())), capacity(0) {} ReceiverContext::~ReceiverContext() { @@ -84,7 +84,36 @@ const std::string& ReceiverContext::getName() const const std::string& ReceiverContext::getSource() const { - return source; + return address.getName(); +} +namespace { +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} +} + +void ReceiverContext::configure() const +{ + configure(pn_link_source(receiver)); +} +void ReceiverContext::configure(pn_terminus_t* source) const +{ + pn_terminus_set_address(source, address.getName().c_str()); + + pn_data_t* filter = pn_terminus_filter(source); + pn_data_put_map(filter); + pn_data_enter(filter); + pn_data_put_symbol(filter, convert("subject")); + pn_data_put_described(filter); + pn_data_enter(filter); + pn_data_put_ulong(filter, 0x0000468C00000001/*LEGACY_TOPIC_FILTER*/); + pn_data_put_string(filter, convert(address.getSubject())); + pn_data_exit(filter); + pn_data_exit(filter); } bool ReceiverContext::isClosed() const diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 0a6f363228..34ecdda6be 100644 --- a/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -21,11 +21,13 @@ * under the License. * */ +#include "qpid/messaging/Address.h" #include <string> #include "qpid/sys/IntegerTypes.h" struct pn_link_t; struct pn_session_t; +struct pn_terminus_t; namespace qpid { namespace messaging { @@ -41,22 +43,25 @@ namespace amqp { class ReceiverContext { public: - ReceiverContext(pn_session_t* session, const std::string& name, const std::string& source); + ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source); ~ReceiverContext(); void setCapacity(uint32_t); uint32_t getCapacity(); uint32_t getAvailable(); uint32_t getUnsettled(); + void attach(); void close(); const std::string& getName() const; const std::string& getSource() const; bool isClosed() const; + void configure() const; private: friend class ConnectionContext; const std::string name; - const std::string source; + const Address address; pn_link_t* receiver; uint32_t capacity; + void configure(pn_terminus_t*) const; }; }}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 1306a314be..02f7bdec1c 100644 --- a/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -36,10 +36,10 @@ namespace qpid { namespace messaging { namespace amqp { //TODO: proper conversion to wide string for address -SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t) +SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) : name(n), - target(t), - sender(pn_sender(session, target.c_str())), capacity(1000) {} + address(a), + sender(pn_sender(session, n.c_str())), capacity(1000) {} SenderContext::~SenderContext() { @@ -74,7 +74,7 @@ const std::string& SenderContext::getName() const const std::string& SenderContext::getTarget() const { - return target; + return address.getName(); } SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) @@ -82,7 +82,7 @@ SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& mes if (processUnsettled() < capacity) { deliveries.push_back(Delivery(nextId++)); Delivery& delivery = deliveries.back(); - delivery.encode(MessageImplAccess::get(message)); + delivery.encode(MessageImplAccess::get(message), address); delivery.send(sender); return &delivery; } else { @@ -135,7 +135,7 @@ const std::string EMPTY; class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties { public: - PropertiesAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {} + PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {} bool hasMessageId() const { return getMessageId().size(); @@ -167,12 +167,12 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties bool hasSubject() const { - return getSubject().size(); + return subject.size() || getSubject().size(); } std::string getSubject() const { - return msg.getSubject(); + return subject.size() ? subject : msg.getSubject(); } bool hasReplyTo() const @@ -266,16 +266,23 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties } private: const qpid::messaging::MessageImpl& msg; + const std::string subject; }; + +bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + return address.getSubject().size() && address.getSubject() != msg.getSubject(); +} + } SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {} -void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg) +void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) { boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); - if (original) { //still have the content as received, send at least the bare message unaltered + if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? if (original->hasHeaderChanged(msg)) { //since as yet have no annotations, just write the revised header then the rest of the message as received @@ -293,7 +300,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg) } } else { HeaderAdapter header(msg); - PropertiesAdapter properties(msg); + PropertiesAdapter properties(msg, address.getSubject()); //compute size: encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes())); QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.h b/cpp/src/qpid/messaging/amqp/SenderContext.h index 82c5e6dab9..bc73d199e7 100644 --- a/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -25,6 +25,7 @@ #include <string> #include <vector> #include "qpid/sys/IntegerTypes.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/amqp/EncodedMessage.h" struct pn_delivery_t; @@ -48,7 +49,7 @@ class SenderContext { public: Delivery(int32_t id); - void encode(const qpid::messaging::MessageImpl& message); + void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&); void send(pn_link_t*); bool accepted(); private: @@ -57,7 +58,7 @@ class SenderContext EncodedMessage encoded; }; - SenderContext(pn_session_t* session, const std::string& name, const std::string& target); + SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target); ~SenderContext(); void close(); void setCapacity(uint32_t); @@ -71,7 +72,7 @@ class SenderContext typedef std::deque<Delivery> Deliveries; const std::string name; - const std::string target; + const qpid::messaging::Address address; pn_link_t* sender; int32_t nextId; Deliveries deliveries; diff --git a/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 9908b16443..8b3feb129a 100644 --- a/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -49,7 +49,7 @@ boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messag for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) { name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); } - boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address.str())); + boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address)); senders[name] = s; return s; } @@ -62,7 +62,7 @@ boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::me for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) { name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); } - boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address.str())); + boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address)); receivers[name] = r; return r; } |