diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 83 |
1 files changed, 12 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index d68845062d..d143471559 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -19,18 +19,16 @@ * */ -#include "qpid/broker/Broker.h" -#include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/FedOps.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/broker/Queue.h" +#include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" -#include "qpid/management/ManagementAgent.h" -#include "qpid/sys/ExceptionHolder.h" -#include <stdexcept> +#include "qpid/broker/DeliverableMessage.h" using namespace qpid::broker; using namespace qpid::framing; @@ -58,7 +56,7 @@ Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { if (parent->sequence){ parent->sequenceNo++; - msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo); + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); } if (parent->ive) { parent->lastMsg = &( msg.getMessage()); @@ -72,36 +70,6 @@ Exchange::PreRoute::~PreRoute(){ } } -namespace { -/** Store information about an exception to be thrown later. - * If multiple exceptions are stored, save the first of the "most severe" - * exceptions, SESSION is les sever than CONNECTION etc. - */ -class ExInfo { - public: - enum Type { NONE, SESSION, CONNECTION, OTHER }; - - ExInfo(string exchange) : type(NONE), exchange(exchange) {} - void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) { - QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue " - << queue->getName() << ": " << exception_.what()); - if (type < type_) { // Replace less severe exception - type = type_; - exception = exception_; - } - } - - void raise() { - exception.raise(); - } - - private: - Type type; - string exchange; - qpid::sys::ExceptionHolder exception; -}; -} - void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { int count = 0; @@ -112,25 +80,11 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) msg.getMessage().blockContentRelease(); } - - ExInfo error(getName()); // Save exception to throw at the end. for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { - try { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); - } - catch (const SessionException& e) { - error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue); - } - catch (const ConnectionException& e) { - error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue); - } - catch (const std::exception& e) { - error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue); - } + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); } - error.raise(); } if (mgmtExchange != 0) @@ -161,7 +115,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) + sequenceNo(0), ive(false), mgmtExchange(0), broker(b) { if (parent != 0 && broker != 0) { @@ -179,7 +133,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent, Broker* b) : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), - args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b) { if (parent != 0 && broker != 0) { @@ -201,11 +155,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } ive = _args.get(qpidIVE); - if (ive) { - if (broker && broker->isInCluster()) - throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster"); - QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); - } + if (ive) QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); } Exchange::~Exchange () @@ -390,14 +340,5 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) } void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { - msg->setExchange(getName()); -} - -bool Exchange::routeWithAlternate(Deliverable& msg) -{ - route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); - if (!msg.delivered && alternate) { - alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); - } - return msg.delivered; + msg->getProperties<DeliveryProperties>()->setExchange(getName()); } |