diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 403 |
1 files changed, 403 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp new file mode 100644 index 0000000000..622cc81002 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -0,0 +1,403 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Broker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/FedOps.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/sys/ExceptionHolder.h" +#include <stdexcept> + +using namespace qpid::broker; +using namespace qpid::framing; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; +using qpid::sys::Mutex; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace +{ + const std::string qpidMsgSequence("qpid.msg_sequence"); + const std::string qpidSequenceCounter("qpid.sequence_counter"); + const std::string qpidIVE("qpid.ive"); + const std::string QPID_MANAGEMENT("qpid.management"); +} + + +Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { + if (parent){ + if (parent->sequence || parent->ive) parent->sequenceLock.lock(); + + if (parent->sequence){ + parent->sequenceNo++; + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + } + if (parent->ive) { + parent->lastMsg = &( msg.getMessage()); + } + } +} + +Exchange::PreRoute::~PreRoute(){ + if (parent && (parent->sequence || parent->ive)){ + parent->sequenceLock.unlock(); + } +} + +namespace { +/** Store information about an exception to be thrown later. + * If multiple exceptions are stored, save the first of the "most severe" + * exceptions, SESSION is les sever than CONNECTION etc. + */ +class ExInfo { + public: + enum Type { NONE, SESSION, CONNECTION, OTHER }; + + ExInfo(string exchange) : type(NONE), exchange(exchange) {} + void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) { + QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue " + << queue->getName() << ": " << exception_.what()); + if (type < type_) { // Replace less severe exception + type = type_; + exception = exception_; + } + } + + void raise() { + exception.raise(); + } + + private: + Type type; + string exchange; + qpid::sys::ExceptionHolder exception; +}; +} + +void Exchange::doRoute(Deliverable& msg, ConstBindingList b) +{ + int count = 0; + + if (b.get()) { + // Block the content release if the message is transient AND there is more than one binding + if (!msg.getMessage().isPersistent() && b->size() > 1) { + msg.getMessage().blockContentRelease(); + } + + + ExInfo error(getName()); // Save exception to throw at the end. + for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { + try { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + catch (const SessionException& e) { + error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue); + } + catch (const ConnectionException& e) { + error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue); + } + catch (const std::exception& e) { + error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue); + } + } + error.raise(); + } + + if (mgmtExchange != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } +} + +void Exchange::routeIVE(){ + if (ive && lastMsg.get()){ + DeliverableMessage dmsg(lastMsg); + route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders()); + } +} + + +Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : + name(_name), durable(false), persistenceId(0), sequence(false), + sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) +{ + if (parent != 0 && broker != 0) + { + ManagementAgent* agent = broker->getManagementAgent(); + if (agent != 0) + { + mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); + mgmtExchange->set_durable(durable); + mgmtExchange->set_autoDelete(false); + agent->addObject(mgmtExchange, 0, durable); + } + } +} + +Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, + Manageable* parent, Broker* b) + : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) +{ + if (parent != 0 && broker != 0) + { + ManagementAgent* agent = broker->getManagementAgent(); + if (agent != 0) + { + mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); + mgmtExchange->set_durable(durable); + mgmtExchange->set_autoDelete(false); + mgmtExchange->set_arguments(ManagementAgent::toMap(args)); + agent->addObject(mgmtExchange, 0, durable); + } + } + + sequence = _args.get(qpidMsgSequence); + if (sequence) { + QPID_LOG(debug, "Configured exchange " << _name << " with Msg sequencing"); + args.setInt64(std::string(qpidSequenceCounter), sequenceNo); + } + + ive = _args.get(qpidIVE); + if (ive) { + if (broker && broker->isInCluster()) + throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster"); + QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); + } +} + +Exchange::~Exchange () +{ + if (mgmtExchange != 0) + mgmtExchange->resourceDestroy (); +} + +void Exchange::setAlternate(Exchange::shared_ptr _alternate) +{ + alternate = _alternate; + if (mgmtExchange != 0) { + if (alternate.get() != 0) + mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId()); + else + mgmtExchange->clr_altExchange(); + } +} + +void Exchange::setPersistenceId(uint64_t id) const +{ + persistenceId = id; +} + +Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer) +{ + string name; + string type; + string altName; + FieldTable args; + + buffer.getShortString(name); + bool durable(buffer.getOctet()); + buffer.getShortString(type); + buffer.get(args); + // For backwards compatibility on restoring exchanges from before the alt-exchange update, perform check + if (buffer.available()) + buffer.getShortString(altName); + + try { + Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first; + exch->sequenceNo = args.getAsInt64(qpidSequenceCounter); + exch->alternateName.assign(altName); + return exch; + } catch (const UnknownExchangeTypeException&) { + QPID_LOG(warning, "Could not create exchange " << name << "; type " << type << " is not recognised"); + return Exchange::shared_ptr(); + } +} + +void Exchange::encode(Buffer& buffer) const +{ + buffer.putShortString(name); + buffer.putOctet(durable); + buffer.putShortString(getType()); + if (args.isSet(qpidSequenceCounter)) + args.setInt64(std::string(qpidSequenceCounter),sequenceNo); + buffer.put(args); + buffer.putShortString(alternate.get() ? alternate->getName() : string("")); +} + +uint32_t Exchange::encodedSize() const +{ + return name.size() + 1/*short string size*/ + + 1 /*durable*/ + + getType().size() + 1/*short string size*/ + + (alternate.get() ? alternate->getName().size() : 0) + 1/*short string size*/ + + args.encodedSize(); +} + +void Exchange::recoveryComplete(ExchangeRegistry& exchanges) +{ + if (!alternateName.empty()) { + try { + Exchange::shared_ptr ae = exchanges.get(alternateName); + setAlternate(ae); + } catch (const NotFoundException&) { + QPID_LOG(warning, "Could not set alternate exchange \"" << alternateName << "\": does not exist."); + } + } +} + +ManagementObject* Exchange::GetManagementObject (void) const +{ + return (ManagementObject*) mgmtExchange; +} + +void Exchange::registerDynamicBridge(DynamicBridge* db) +{ + if (!supportsDynamicBinding()) + throw Exception("Exchange type does not support dynamic binding"); + + { + Mutex::ScopedLock l(bridgeLock); + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + (*iter)->sendReorigin(); + + bridgeVector.push_back(db); + } + + FieldTable args; + args.setString(qpidFedOp, fedOpReorigin); + bind(Queue::shared_ptr(), string(), &args); +} + +void Exchange::removeDynamicBridge(DynamicBridge* db) +{ + Mutex::ScopedLock l(bridgeLock); + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + if (*iter == db) { + bridgeVector.erase(iter); + break; + } +} + +void Exchange::handleHelloRequest() +{ +} + +void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin, qpid::framing::FieldTable* extra_args) +{ + Mutex::ScopedLock l(bridgeLock); + string myOp(op.empty() ? fedOpBind : op); + + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + (*iter)->propagateBinding(routingKey, tags, op, origin, extra_args); +} + +Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent, + FieldTable _args, const string& _origin) + : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0) +{ +} + +Exchange::Binding::~Binding () +{ + if (mgmtBinding != 0) { + ManagementObject* mo = queue->GetManagementObject(); + if (mo != 0) + static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); + mgmtBinding->resourceDestroy (); + } +} + +void Exchange::Binding::startManagement() +{ + if (parent != 0) + { + Broker* broker = parent->getBroker(); + if (broker != 0) { + ManagementAgent* agent = broker->getManagementAgent(); + if (agent != 0) { + ManagementObject* mo = queue->GetManagementObject(); + if (mo != 0) { + management::ObjectId queueId = mo->getObjectId(); + + mgmtBinding = new _qmf::Binding + (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)); + if (!origin.empty()) + mgmtBinding->set_origin(origin); + agent->addObject(mgmtBinding); + static_cast<_qmf::Queue*>(mo)->inc_bindingCount(); + } + } + } + } +} + +ManagementObject* Exchange::Binding::GetManagementObject () const +{ + return (ManagementObject*) mgmtBinding; +} + +Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {} + +bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) +{ + return b->queue == queue; +} + +void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { + msg->getProperties<DeliveryProperties>()->setExchange(getName()); +} + +bool Exchange::routeWithAlternate(Deliverable& msg) +{ + route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); + if (!msg.delivered && alternate) { + alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); + } + return msg.delivered; +} |