summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Exchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp83
1 files changed, 12 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index d68845062d..d143471559 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -19,18 +19,16 @@
*
*/
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/FedOps.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qpid/sys/ExceptionHolder.h"
-#include <stdexcept>
+#include "qpid/broker/DeliverableMessage.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -58,7 +56,7 @@ Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
if (parent->sequence){
parent->sequenceNo++;
- msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
}
if (parent->ive) {
parent->lastMsg = &( msg.getMessage());
@@ -72,36 +70,6 @@ Exchange::PreRoute::~PreRoute(){
}
}
-namespace {
-/** Store information about an exception to be thrown later.
- * If multiple exceptions are stored, save the first of the "most severe"
- * exceptions, SESSION is les sever than CONNECTION etc.
- */
-class ExInfo {
- public:
- enum Type { NONE, SESSION, CONNECTION, OTHER };
-
- ExInfo(string exchange) : type(NONE), exchange(exchange) {}
- void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) {
- QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue "
- << queue->getName() << ": " << exception_.what());
- if (type < type_) { // Replace less severe exception
- type = type_;
- exception = exception_;
- }
- }
-
- void raise() {
- exception.raise();
- }
-
- private:
- Type type;
- string exchange;
- qpid::sys::ExceptionHolder exception;
-};
-}
-
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
int count = 0;
@@ -112,25 +80,11 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
msg.getMessage().blockContentRelease();
}
-
- ExInfo error(getName()); // Save exception to throw at the end.
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
- try {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
- }
- catch (const SessionException& e) {
- error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue);
- }
- catch (const ConnectionException& e) {
- error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue);
- }
- catch (const std::exception& e) {
- error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue);
- }
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
}
- error.raise();
}
if (mgmtExchange != 0)
@@ -161,7 +115,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
+ sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
{
if (parent != 0 && broker != 0)
{
@@ -179,7 +133,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent, Broker* b)
: name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
- args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
+ args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
{
if (parent != 0 && broker != 0)
{
@@ -201,11 +155,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
}
ive = _args.get(qpidIVE);
- if (ive) {
- if (broker && broker->isInCluster())
- throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
- QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value");
- }
+ if (ive) QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value");
}
Exchange::~Exchange ()
@@ -390,14 +340,5 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
}
void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
- msg->setExchange(getName());
-}
-
-bool Exchange::routeWithAlternate(Deliverable& msg)
-{
- route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
- if (!msg.delivered && alternate) {
- alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
- }
- return msg.delivered;
+ msg->getProperties<DeliveryProperties>()->setExchange(getName());
}