diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker')
33 files changed, 519 insertions, 383 deletions
diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h index 536fa21b2b..2f4f7eaacc 100644 --- a/qpid/cpp/src/qpid/broker/AclModule.h +++ b/qpid/cpp/src/qpid/broker/AclModule.h @@ -40,7 +40,8 @@ enum Action {ACT_CONSUME, ACT_PUBLISH, ACT_CREATE, ACT_ACCESS, ACT_BIND, enum Property {PROP_NAME, PROP_DURABLE, PROP_OWNER, PROP_ROUTINGKEY, PROP_PASSIVE, PROP_AUTODELETE, PROP_EXCLUSIVE, PROP_TYPE, PROP_ALTERNATE, PROP_QUEUENAME, PROP_SCHEMAPACKAGE, - PROP_SCHEMACLASS}; + PROP_SCHEMACLASS, PROP_POLICYTYPE, PROP_MAXQUEUESIZE, + PROP_MAXQUEUECOUNT}; enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG}; } // namespace acl @@ -132,6 +133,9 @@ class AclHelper { if (str.compare("queuename") == 0) return PROP_QUEUENAME; if (str.compare("schemapackage") == 0) return PROP_SCHEMAPACKAGE; if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS; + if (str.compare("policytype") == 0) return PROP_POLICYTYPE; + if (str.compare("maxqueuesize") == 0) return PROP_MAXQUEUESIZE; + if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT; throw str; } static inline std::string getPropertyStr(const Property p) { @@ -148,6 +152,9 @@ class AclHelper { case PROP_QUEUENAME: return "queuename"; case PROP_SCHEMAPACKAGE: return "schemapackage"; case PROP_SCHEMACLASS: return "schemaclass"; + case PROP_POLICYTYPE: return "policytype"; + case PROP_MAXQUEUESIZE: return "maxqueuesize"; + case PROP_MAXQUEUECOUNT: return "maxqueuecount"; default: assert(false); // should never get here } return ""; @@ -217,11 +224,14 @@ class AclHelper { // == Queues == propSetPtr p4(new propSet); - p3->insert(PROP_ALTERNATE); - p3->insert(PROP_PASSIVE); - p3->insert(PROP_DURABLE); - p3->insert(PROP_EXCLUSIVE); - p3->insert(PROP_AUTODELETE); + p4->insert(PROP_ALTERNATE); + p4->insert(PROP_PASSIVE); + p4->insert(PROP_DURABLE); + p4->insert(PROP_EXCLUSIVE); + p4->insert(PROP_AUTODELETE); + p4->insert(PROP_POLICYTYPE); + p4->insert(PROP_MAXQUEUESIZE); + p4->insert(PROP_MAXQUEUECOUNT); actionMapPtr a1(new actionMap); a1->insert(actionPair(ACT_ACCESS, p0)); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 13cf88fb11..4259bb2f31 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -91,7 +91,8 @@ Broker::Options::Options(const std::string& name) : queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(false), requireEncrypted(false), - maxSessionRate(0) + maxSessionRate(0), + asyncQueueEvents(true) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -121,7 +122,8 @@ Broker::Options::Options(const std::string& name) : ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") - ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)"); + ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") + ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication"); } const std::string empty; @@ -150,7 +152,7 @@ Broker::Broker(const Broker::Options& conf) : *this), managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), queueCleaner(queues, timer), - queueEvents(poller), + queueEvents(poller,!conf.asyncQueueEvents), recovery(true), expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) @@ -208,8 +210,10 @@ Broker::Broker(const Broker::Options& conf) : (*i)->earlyInitialize(*this); // If no plugin store module registered itself, set up the null store. - if (store.get() == 0) - setStore (new NullMessageStore()); + if (store.get() == 0) { + boost::shared_ptr<MessageStore> p(new NullMessageStore()); + setStore (p); + } exchanges.declare(empty, DirectExchange::typeName); // Default exchange. @@ -296,7 +300,7 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts) return boost::intrusive_ptr<Broker>(new Broker(opts)); } -void Broker::setStore (MessageStore* _store) +void Broker::setStore (boost::shared_ptr<MessageStore>& _store) { store.reset(new MessageStoreModule (_store)); queues.setStore (store.get()); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 0517ceca95..5ca01e0867 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -111,6 +111,7 @@ public: bool requireEncrypted; std::string knownHosts; uint32_t maxSessionRate; + bool asyncQueueEvents; private: std::string getHome(); @@ -171,7 +172,7 @@ public: /** Shut down the broker */ virtual void shutdown(); - QPID_BROKER_EXTERN void setStore (MessageStore*); + QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store); MessageStore& getStore() { return *store; } void setAcl (AclModule* _acl) {acl = _acl;} AclModule* getAcl() { return acl; } diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index b9f24dee5f..29fe47beac 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -145,39 +145,12 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) { PreRoute pr(msg, this); - Queues::ConstPtr p; + ConstBindingList b; { Mutex::ScopedLock l(lock); - p = bindings[routingKey].queues.snapshot(); - } - int count(0); - - if (p) { - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); - } - } - - if(!count){ - QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey - << "; no matching binding found"); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes(count); - mgmtExchange->inc_byteRoutes(count * msg.contentSize()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); + b = bindings[routingKey].queues.snapshot(); } + doRoute(msg, b); } diff --git a/qpid/cpp/src/qpid/broker/DtxAck.cpp b/qpid/cpp/src/qpid/broker/DtxAck.cpp index b189ef4cdb..bca3f90bbe 100644 --- a/qpid/cpp/src/qpid/broker/DtxAck.cpp +++ b/qpid/cpp/src/qpid/broker/DtxAck.cpp @@ -48,12 +48,26 @@ bool DtxAck::prepare(TransactionContext* ctxt) throw() void DtxAck::commit() throw() { - for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed)); - pending.clear(); + try { + for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed)); + pending.clear(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } + } void DtxAck::rollback() throw() { - for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue)); - pending.clear(); + try { + for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue)); + pending.clear(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to complete rollback: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to complete rollback (unknown error)"); + } + } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 90d81b81c6..757127eef2 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -76,6 +76,40 @@ Exchange::PreRoute::~PreRoute(){ } } +void Exchange::doRoute(Deliverable& msg, ConstBindingList b) +{ + int count = 0; + + if (b.get()) { + // Block the content release if the message is transient AND there is more than one binding + if (!msg.getMessage().isPersistent() && b->size() > 1) + msg.getMessage().blockContentRelease(); + + for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + } + + if (mgmtExchange != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } +} + void Exchange::routeIVE(){ if (ive && lastMsg.get()){ DeliverableMessage dmsg(lastMsg); diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index c1e878200f..9bea376c28 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -79,6 +79,9 @@ protected: Exchange* parent; }; + typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList; + typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList; + void doRoute(Deliverable& msg, ConstBindingList b); void routeIVE(); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index e9007ba682..b7d46a33fe 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -106,36 +106,12 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons return true; } -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/) +{ PreRoute pr(msg, this); - uint32_t count(0); - - BindingsArray::ConstPtr p = bindings.snapshot(); - if (p.get()){ - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + doRoute(msg, bindings.snapshot()); } - + bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { BindingsArray::ConstPtr ptr = bindings.snapshot(); diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index c628c44909..a7c90156e1 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -104,7 +104,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, } -void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ +void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args) +{ if (!args) { //can't match if there were no headers passed in if (mgmtExchange != 0) { @@ -118,31 +119,17 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons PreRoute pr(msg, this); - uint32_t count(0); - - Bindings::ConstPtr p = bindings.snapshot(); - if (p.get()){ + ConstBindingList p = bindings.snapshot(); + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + if (p.get()) + { for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { if (match((*i)->args, *args)) { - msg.deliverTo((*i)->queue); - count++; - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); + b->push_back(*i); } } } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); - if (count == 0) { - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - } else { - mgmtExchange->inc_msgRoutes(count); - mgmtExchange->inc_byteRoutes(count * msg.contentSize()); - } - } + doRoute(msg, b); } diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 7360010192..e2799b0bff 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -49,7 +49,7 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {} Message::~Message() { @@ -98,7 +98,7 @@ const FieldTable* Message::getApplicationHeaders() const return getAdapter().getApplicationHeaders(frames); } -bool Message::isPersistent() +bool Message::isPersistent() const { return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } @@ -108,12 +108,16 @@ bool Message::requiresAccept() return getAdapter().requiresAccept(frames); } -uint32_t Message::getRequiredCredit() const +uint32_t Message::getRequiredCredit() { - //add up payload for all header and content frames in the frameset - SumBodySize sum; - frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); - return sum.getSize(); + sys::Mutex::ScopedLock l(lock); + if (!requiredCredit) { + //add up payload for all header and content frames in the frameset + SumBodySize sum; + frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); + requiredCredit = sum.getSize(); + } + return requiredCredit; } void Message::encode(framing::Buffer& buffer) const @@ -181,17 +185,31 @@ void Message::decodeContent(framing::Buffer& buffer) loaded = true; } -void Message::releaseContent(MessageStore* _store) +void Message::tryReleaseContent() { - if (!store) { - store = _store; + if (checkContentReleasable()) { + releaseContent(); } +} + +void Message::releaseContent(MessageStore* s) +{ + //deprecated, use setStore(store); releaseContent(); instead + if (!store) setStore(s); + releaseContent(); +} + +void Message::releaseContent() +{ + sys::Mutex::ScopedLock l(lock); if (store) { if (!getPersistenceId()) { intrusive_ptr<PersistableMessage> pmsg(this); store->stage(pmsg); staged = true; } + //ensure required credit is cached before content frames are released + getRequiredCredit(); //remove any content frames from the frameset frames.remove(TypeFilter<CONTENT_BODY>()); setContentReleased(); @@ -211,31 +229,29 @@ void Message::destroy() bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { - if (isContentReleased()) { - intrusive_ptr<const PersistableMessage> pmsg(this); - - bool done = false; - string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, pmsg, data, offset, maxContentSize); - done = data.size() < maxContentSize; - frame.setBof(false); - frame.setEof(true); - QPID_LOG(debug, "loaded frame" << frame); - if (offset > 0) { - frame.setBos(false); - } - if (!done) { - frame.setEos(false); - } else return false; - return true; + intrusive_ptr<const PersistableMessage> pmsg(this); + + bool done = false; + string& data = frame.castBody<AMQContentBody>()->getData(); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; + frame.setBof(false); + frame.setEof(true); + QPID_LOG(debug, "loaded frame" << frame); + if (offset > 0) { + frame.setBos(false); } - else return false; + if (!done) { + frame.setEos(false); + } else return false; + return true; } void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { + sys::Mutex::ScopedLock l(lock); if (isContentReleased() && !frames.isComplete()) { - + sys::Mutex::ScopedUnlock u(lock); uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) @@ -373,28 +389,36 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que } void Message::allEnqueuesComplete() { - MessageCallback* cb = 0; - { - sys::Mutex::ScopedLock l(lock); - std::swap(cb, enqueueCallback); - } + sys::Mutex::ScopedLock l(callbackLock); + MessageCallback* cb = enqueueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } void Message::allDequeuesComplete() { - MessageCallback* cb = 0; - { - sys::Mutex::ScopedLock l(lock); - std::swap(cb, dequeueCallback); - } + sys::Mutex::ScopedLock l(callbackLock); + MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } -void Message::setEnqueueCompleteCallback(MessageCallback& cb) { enqueueCallback = &cb; } -void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; } +void Message::setEnqueueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + enqueueCallback = &cb; +} + +void Message::resetEnqueueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + enqueueCallback = 0; +} -void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; } -void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; } +void Message::setDequeueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + dequeueCallback = &cb; +} + +void Message::resetDequeueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + dequeueCallback = 0; +} framing::FieldTable& Message::getOrInsertHeaders() { diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index e4d09b1042..3894960c95 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -74,7 +74,7 @@ public: bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; framing::FieldTable& getOrInsertHeaders(); - QPID_BROKER_EXTERN bool isPersistent(); + QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); @@ -108,7 +108,7 @@ public: return frames.isA<T>(); } - uint32_t getRequiredCredit() const; + uint32_t getRequiredCredit(); void encode(framing::Buffer& buffer) const; void encodeContent(framing::Buffer& buffer) const; @@ -129,12 +129,9 @@ public: QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - void releaseContent(MessageStore* store); + void QPID_BROKER_EXTERN tryReleaseContent(); + void releaseContent(); + void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead void destroy(); bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const; @@ -187,8 +184,12 @@ public: mutable Replacement replacement; mutable boost::intrusive_ptr<Message> empty; + + sys::Mutex callbackLock; MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; + + uint32_t requiredCredit; static std::string updateDestination; }; diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index 14b233fd6c..b1a2b77b05 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -80,7 +80,7 @@ void MessageBuilder::handle(AMQFrame& frame) && !NullMessageStore::isNullStore(store) && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */) { - message->releaseContent(store); + message->releaseContent(); staging = true; } } @@ -96,6 +96,7 @@ void MessageBuilder::end() void MessageBuilder::start(const SequenceNumber& id) { message = intrusive_ptr<Message>(new Message(id)); + message->setStore(store); state = METHOD; staging = false; } diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index 0b8a5db1c7..5f7cceebd3 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -32,11 +32,11 @@ using qpid::framing::FieldTable; namespace qpid { namespace broker { -MessageStoreModule::MessageStoreModule(MessageStore* _store) : store(_store) {} +MessageStoreModule::MessageStoreModule(boost::shared_ptr<MessageStore>& _store) + : store(_store) {} MessageStoreModule::~MessageStoreModule() { - delete store; } bool MessageStoreModule::init(const Options*) { return true; } @@ -173,7 +173,7 @@ void MessageStoreModule::collectPreparedXids(std::set<std::string>& xids) bool MessageStoreModule::isNull() const { - return NullMessageStore::isNullStore(store); + return NullMessageStore::isNullStore(store.get()); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index 02cbd13cf1..56b5a3c1ae 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -26,6 +26,7 @@ #include "qpid/broker/RecoveryManager.h" #include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { @@ -35,9 +36,9 @@ namespace broker { */ class MessageStoreModule : public MessageStore { - MessageStore* store; + boost::shared_ptr<MessageStore> store; public: - MessageStoreModule(MessageStore* store); + MessageStoreModule(boost::shared_ptr<MessageStore>& store); bool init(const Options* options); void truncateInit(const bool pushDownStoreFiles = false); diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp index 2ef223aa81..303a0501f4 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp @@ -36,7 +36,6 @@ PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : asyncEnqueueCounter(0), asyncDequeueCounter(0), - contentReleased(false), store(0) {} @@ -59,9 +58,15 @@ void PersistableMessage::flush() } } -void PersistableMessage::setContentReleased() {contentReleased = true; } +void PersistableMessage::setContentReleased() +{ + contentReleaseState.released = true; +} -bool PersistableMessage::isContentReleased()const { return contentReleased; } +bool PersistableMessage::isContentReleased() const +{ + return contentReleaseState.released; +} bool PersistableMessage::isEnqueueComplete() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); @@ -153,6 +158,26 @@ void PersistableMessage::dequeueAsync() { asyncDequeueCounter++; } +PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} + +void PersistableMessage::setStore(MessageStore* s) +{ + store = s; +} + +void PersistableMessage::requestContentRelease() +{ + contentReleaseState.requested = true; +} +void PersistableMessage::blockContentRelease() +{ + contentReleaseState.blocked = true; +} +bool PersistableMessage::checkContentReleasable() +{ + return contentReleaseState.requested && !contentReleaseState.blocked; +} + }} diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index 0274b41375..2576e266d2 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -68,8 +68,16 @@ class PersistableMessage : public Persistable void enqueueAsync(); void dequeueAsync(); - bool contentReleased; syncList synclist; + struct ContentReleaseState + { + bool blocked; + bool requested; + bool released; + + ContentReleaseState(); + }; + ContentReleaseState contentReleaseState; protected: /** Called when all enqueues are complete for this message. */ @@ -96,8 +104,15 @@ class PersistableMessage : public Persistable void flush(); - bool isContentReleased() const; - + bool QPID_BROKER_EXTERN isContentReleased() const; + + void QPID_BROKER_EXTERN setStore(MessageStore*); + void requestContentRelease(); + void blockContentRelease(); + bool checkContentReleasable(); + + virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; + QPID_BROKER_EXTERN bool isEnqueueComplete(); QPID_BROKER_EXTERN void enqueueComplete(); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index b2a8e223c5..86de96468d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -181,6 +181,8 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ + if (policy.get()) policy->recoverEnqueued(msg); + push(msg, true); if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure @@ -206,11 +208,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ - if (!isEnqueued(msg)) return; - QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); + if (!isEnqueued(msg)) return; msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); @@ -563,16 +564,10 @@ void Queue::popMsg(QueuedMessage& qmsg) } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ - Messages dequeues; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (policy.get()) { - policy->tryEnqueue(qm); - //depending on policy, may have some dequeues - if (!isRecovery) pendingDequeues.swap(dequeues); - } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -606,12 +601,11 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (eventMgr) eventMgr->enqueued(qm); else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } + if (policy.get()) { + policy->enqueued(qm); + } } copy.notify(); - if (!dequeues.empty()) { - //depending on policy, may have some dequeues - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); - } } QueuedMessage Queue::getFront() @@ -697,8 +691,19 @@ void Queue::setLastNodeFailure() } // return true if store exists, -bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) +bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { + if (policy.get() && !suppressPolicyCheck) { + Messages dequeues; + { + Mutex::ScopedLock locker(messageLock); + policy->tryEnqueue(msg); + policy->getPendingDequeues(dequeues); + } + //depending on policy, may have some dequeues that need to performed without holding the lock + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + } + if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } @@ -707,15 +712,27 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) msg->addTraceId(traceId); } - if (msg->isPersistent() && store) { + if ((msg->isPersistent() || msg->checkContentReleasable()) && store) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; } + if (!store) { + //Messages enqueued on a transient queue should be prevented + //from having their content released as it may not be + //recoverable by these queue for delivery + msg->blockContentRelease(); + } return false; } +void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) +{ + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->enqueueAborted(msg); +} + // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { @@ -726,7 +743,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } } - if (msg.payload->isPersistent() && store) { + if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); @@ -781,22 +798,37 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings, bool recovering) { - setPolicy(QueuePolicy::createQueuePolicy(_settings)); + + eventMode = _settings.getAsInt(qpidQueueEventGeneration); + + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && + (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + if ( NullMessageStore::isNullStore(store)) { + QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); + } else if (eventMgr && !eventMgr->isSync() ) { + QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); + } + FieldTable copy(_settings); + copy.erase(QueuePolicy::typeKey); + setPolicy(QueuePolicy::createQueuePolicy(getName(), copy)); + } else { + setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); + } //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); - QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); lastValueQueue= _settings.get(qpidLastValueQueue); - if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); + if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName()); lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); if (lastValueQueueNoBrowse){ - QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); + QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName()); lastValueQueue = lastValueQueueNoBrowse; } persistLastNode= _settings.get(qpidPersistLastNode); - if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); + if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); traceId = _settings.getAsString(qpidTraceIdentity); std::string excludeList = _settings.getAsString(qpidTraceExclude); @@ -806,8 +838,6 @@ void Queue::configure(const FieldTable& _settings, bool recovering) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); - eventMode = _settings.getAsInt(qpidQueueEventGeneration); - FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); @@ -975,19 +1005,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } -bool Queue::releaseMessageContent(const QueuedMessage& m) -{ - if (store && !NullMessageStore::isNullStore(store)) { - QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); - m.payload->releaseContent(store); - return true; - } else { - QPID_LOG(warning, "Message " << m.position << " on " << name - << " cannot be released from memory as the queue is not durable"); - return false; - } -} - ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; @@ -1044,11 +1061,12 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { if (m.payload) { - if (policy.get()) policy->tryEnqueue(m); - mgntEnqStats(m.payload); - if (m.payload->isPersistent()) { - enqueue ( 0, m.payload ); + if (policy.get()) { + policy->recoverEnqueued(m.payload); + policy->enqueued(m); } + mgntEnqStats(m.payload); + enqueue ( 0, m.payload, true ); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1059,10 +1077,4 @@ bool Queue::isEnqueued(const QueuedMessage& msg) return !policy.get() || policy->isEnqueued(msg); } -void Queue::addPendingDequeue(const QueuedMessage& msg) -{ - //assumes lock is held - true at present but rather nasty as this is a public method - pendingDequeues.push_back(msg); -} - QueueListeners& Queue::getListeners() { return listeners; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 77799fd967..661e46f619 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -239,7 +239,8 @@ namespace qpid { QPID_BROKER_EXTERN void setLastNodeFailure(); QPID_BROKER_EXTERN void clearLastNodeFailure(); - bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); + bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck = false); + void enqueueAborted(boost::intrusive_ptr<Message> msg); /** * dequeue from store (only done once messages is acknowledged) */ @@ -315,8 +316,6 @@ namespace qpid { bindings.eachBinding(f); } - bool releaseMessageContent(const QueuedMessage&); - void popMsg(QueuedMessage& qmsg); /** Set the position sequence number for the next message on the queue. @@ -335,18 +334,6 @@ namespace qpid { */ void recoveryComplete(); - /** - * This is a hack to avoid deadlocks in durable ring - * queues. It is used for dequeueing messages in response - * to an enqueue while avoid holding lock over call to - * store. - * - * Assumes messageLock is held - true for curent use case - * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public - * method - **/ - void addPendingDequeue(const QueuedMessage &msg); - // For cluster update QueueListeners& getListeners(); }; diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp index 6df869673d..bba054b0b8 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp +++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp @@ -25,25 +25,41 @@ namespace qpid { namespace broker { -QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : - eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) +QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) : + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync) { - eventQueue.start(); + if (!sync) eventQueue.start(); } QueueEvents::~QueueEvents() { - eventQueue.stop(); + if (!sync) eventQueue.stop(); } void QueueEvents::enqueued(const QueuedMessage& m) { - if (enabled) eventQueue.push(Event(ENQUEUE, m)); + if (enabled) { + Event enq(ENQUEUE, m); + if (sync) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(enq); + } else { + eventQueue.push(enq); + } + } } void QueueEvents::dequeued(const QueuedMessage& m) { - if (enabled) eventQueue.push(Event(DEQUEUE, m)); + if (enabled) { + Event deq(DEQUEUE, m); + if (sync) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(deq); + } else { + eventQueue.push(Event(DEQUEUE, m)); + } + } } void QueueEvents::registerListener(const std::string& id, const EventListener& listener) @@ -70,15 +86,16 @@ QueueEvents::EventQueue::Batch::const_iterator QueueEvents::handle(const EventQueue::Batch& events) { qpid::sys::Mutex::ScopedLock l(lock); for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) { - for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) - j->second(*i); + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) { + j->second(*i); + } } return events.end(); } void QueueEvents::shutdown() { - if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); + if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); } void QueueEvents::enable() @@ -93,6 +110,12 @@ void QueueEvents::disable() QPID_LOG(debug, "Queue events disabled"); } +bool QueueEvents::isSync() +{ + return sync; +} + + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.h b/qpid/cpp/src/qpid/broker/QueueEvents.h index 6826c6e79a..c42752133e 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.h +++ b/qpid/cpp/src/qpid/broker/QueueEvents.h @@ -54,7 +54,7 @@ class QueueEvents typedef boost::function<void (Event)> EventListener; - QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller); + QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync = false); QPID_BROKER_EXTERN ~QueueEvents(); QPID_BROKER_EXTERN void enqueued(const QueuedMessage&); QPID_BROKER_EXTERN void dequeued(const QueuedMessage&); @@ -65,6 +65,7 @@ class QueueEvents void disable(); //process all outstanding events QPID_BROKER_EXTERN void shutdown(); + QPID_BROKER_EXTERN bool isSync(); private: typedef qpid::sys::PollableQueue<Event> EventQueue; typedef std::map<std::string, EventListener> Listeners; @@ -73,6 +74,7 @@ class QueueEvents Listeners listeners; volatile bool enabled; qpid::sys::Mutex lock;//protect listeners from concurrent access + bool sync; EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e); diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 39afe90134..a8aa674c53 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -28,8 +28,8 @@ using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {} +QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {} void QueuePolicy::enqueued(uint64_t _size) { @@ -39,18 +39,15 @@ void QueuePolicy::enqueued(uint64_t _size) void QueuePolicy::dequeued(uint64_t _size) { - //Note: underflow detection is not reliable in the face of - //concurrent updates (at present locking in Queue.cpp prevents - //these anyway); updates are atomic and are safe regardless. if (maxCount) { - if (count.get() > 0) { + if (count > 0) { --count; } else { throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this)); } } if (maxSize) { - if (_size > size.get()) { + if (_size > size) { throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this)); } else { size -= _size; @@ -58,47 +55,47 @@ void QueuePolicy::dequeued(uint64_t _size) } } -bool QueuePolicy::checkLimit(const QueuedMessage& m) +bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { - bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize; - bool countExceeded = maxCount && (count.get() + 1) > maxCount; + bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize; + bool countExceeded = maxCount && (count + 1) > maxCount; bool exceeded = sizeExceeded || countExceeded; if (exceeded) { if (!policyExceeded) { - policyExceeded = true; - if (m.queue) { - if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName()); - if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName()); - } + policyExceeded = true; + if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name); + if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name); } } else { if (policyExceeded) { policyExceeded = false; - if (m.queue) { - QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName()); - } + QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name); } } return !exceeded; } -void QueuePolicy::tryEnqueue(const QueuedMessage& m) +void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) { if (checkLimit(m)) { - enqueued(m); + enqueued(m->contentSize()); } else { - std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue"); - throw ResourceLimitExceededException( - QPID_MSG("Policy exceeded on " << queue << " by message " << m.position - << " of size " << m.payload->contentSize() << " , policy: " << *this)); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this)); } } -void QueuePolicy::enqueued(const QueuedMessage& m) +void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m) { - enqueued(m.payload->contentSize()); + enqueued(m->contentSize()); } +void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m) +{ + dequeued(m->contentSize()); +} + +void QueuePolicy::enqueued(const QueuedMessage&) {} + void QueuePolicy::dequeued(const QueuedMessage& m) { dequeued(m.payload->contentSize()); @@ -132,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings) std::transform(t.begin(), t.end(), t.begin(), tolower); if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; } - return FLOW_TO_DISK; + return REJECT; } void QueuePolicy::setDefaultMaxSize(uint64_t s) @@ -140,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s) defaultMaxSize = s; } +void QueuePolicy::getPendingDequeues(Messages&) {} @@ -148,8 +146,8 @@ void QueuePolicy::encode(Buffer& buffer) const { buffer.putLong(maxCount); buffer.putLongLong(maxSize); - buffer.putLong(count.get()); - buffer.putLongLong(size.get()); + buffer.putLong(count); + buffer.putLongLong(size); } void QueuePolicy::decode ( Buffer& buffer ) @@ -179,16 +177,18 @@ const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : - QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {} +FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) : + QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {} -bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) +bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { - return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m); + if (!QueuePolicy::checkLimit(m)) m->requestContentRelease(); + return true; } -RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} +RingQueuePolicy::RingQueuePolicy(const std::string& _name, + uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) { @@ -197,15 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b) void RingQueuePolicy::enqueued(const QueuedMessage& m) { - QueuePolicy::enqueued(m); - qpid::sys::Mutex::ScopedLock l(lock); //need to insert in correct location based on position queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m); } void RingQueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //find and remove m from queue if (find(m, pendingDequeues, true) || find(m, queue, true)) { //now update count and size @@ -215,49 +212,32 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this return find(m, pendingDequeues, false) || find(m, queue, false); } -bool RingQueuePolicy::checkLimit(const QueuedMessage& m) +bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept QueuedMessage oldest; - { - qpid::sys::Mutex::ScopedLock l(lock); - if (queue.empty()) { - QPID_LOG(debug, "Message too large for ring queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << " [" << *this << "] " - << ": message size = " << m.payload->contentSize() << " bytes"); - return false; - } - oldest = queue.front(); + if (queue.empty()) { + QPID_LOG(debug, "Message too large for ring queue " << name + << " [" << *this << "] " + << ": message size = " << m->contentSize() << " bytes"); + return false; } + oldest = queue.front(); if (oldest.queue->acquire(oldest) || !strict) { - { - //TODO: fix this! In the current code, this method is - //only ever called with the Queue lock already taken. This - //should not be relied upon going forward however and - //clearly the locking in this class is insufficient as - //there is no guarantee that the message previously atthe - //front is still there. - qpid::sys::Mutex::ScopedLock l(lock); - queue.pop_front(); - pendingDequeues.push_back(oldest); - } - oldest.queue->addPendingDequeue(oldest); - QPID_LOG(debug, "Ring policy triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << ": removed message " << oldest.position << " to make way for " << m.position); + queue.pop_front(); + pendingDequeues.push_back(oldest); + QPID_LOG(debug, "Ring policy triggered in " << name + << ": removed message " << oldest.position << " to make way for new message"); return true; } else { - QPID_LOG(debug, "Ring policy could not be triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) + QPID_LOG(debug, "Ring policy could not be triggered in " << name << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); //in strict mode, if oldest message has been delivered (hence //cannot be acquired) but not yet acked, it should not be @@ -266,6 +246,11 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) } } +void RingQueuePolicy::getPendingDequeues(Messages& result) +{ + result = pendingDequeues; +} + bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) { for (Messages::iterator i = q.begin(); i != q.end(); i++) { @@ -277,25 +262,36 @@ bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) return false; } +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +{ + return createQueuePolicy("<unspecified>", maxCount, maxSize, type); +} + std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings) { + return createQueuePolicy("<unspecified>", settings); +} + +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings) +{ uint32_t maxCount = getInt(settings, maxCountKey, 0); uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize); if (maxCount || maxSize) { - return createQueuePolicy(maxCount, maxSize, getType(settings)); + return createQueuePolicy(name, maxCount, maxSize, getType(settings)); } else { return std::auto_ptr<QueuePolicy>(); } } -std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, + uint32_t maxCount, uint64_t maxSize, const std::string& type) { if (type == RING || type == RING_STRICT) { - return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type)); } else if (type == FLOW_TO_DISK) { - return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize)); + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize)); } else { - return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type)); } } @@ -305,10 +301,10 @@ namespace qpid { std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) { - if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get(); + if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size; else out << "size: unlimited"; out << "; "; - if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get(); + if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; else out << "count: unlimited"; out << "; type=" << p.type; return out; diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index 54745876d5..b2937e94c7 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -40,14 +40,14 @@ class QueuePolicy uint32_t maxCount; uint64_t maxSize; const std::string type; - qpid::sys::AtomicValue<uint32_t> count; - qpid::sys::AtomicValue<uint64_t> size; + uint32_t count; + uint64_t size; bool policyExceeded; static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); - static std::string getType(const qpid::framing::FieldTable& settings); public: + typedef std::deque<QueuedMessage> Messages; static QPID_BROKER_EXTERN const std::string maxCountKey; static QPID_BROKER_EXTERN const std::string maxSizeKey; static QPID_BROKER_EXTERN const std::string typeKey; @@ -57,27 +57,34 @@ class QueuePolicy static QPID_BROKER_EXTERN const std::string RING_STRICT; virtual ~QueuePolicy() {} - QPID_BROKER_EXTERN void tryEnqueue(const QueuedMessage&); + QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg); + virtual void enqueued(const QueuedMessage&); virtual void dequeued(const QueuedMessage&); virtual bool isEnqueued(const QueuedMessage&); - virtual bool checkLimit(const QueuedMessage&); QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings); uint32_t getMaxCount() const { return maxCount; } uint64_t getMaxSize() const { return maxSize; } void encode(framing::Buffer& buffer) const; void decode ( framing::Buffer& buffer ); uint32_t encodedSize() const; + virtual void getPendingDequeues(Messages& result); - + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static std::string getType(const qpid::framing::FieldTable& settings); static void setDefaultMaxSize(uint64_t); friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueuePolicy&); protected: - QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + const std::string name; - virtual void enqueued(const QueuedMessage&); + QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + + virtual bool checkLimit(boost::intrusive_ptr<Message> msg); void enqueued(uint64_t size); void dequeued(uint64_t size); }; @@ -86,21 +93,20 @@ class QueuePolicy class FlowToDiskPolicy : public QueuePolicy { public: - FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize); - bool checkLimit(const QueuedMessage&); + FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize); + bool checkLimit(boost::intrusive_ptr<Message> msg); }; class RingQueuePolicy : public QueuePolicy { public: - RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); + RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); bool isEnqueued(const QueuedMessage&); - bool checkLimit(const QueuedMessage&); + bool checkLimit(boost::intrusive_ptr<Message> msg); + void getPendingDequeues(Messages& result); private: - typedef std::deque<QueuedMessage> Messages; - qpid::sys::Mutex lock; Messages pendingDequeues; Messages queue; const bool strict; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index bdd5f33601..7e3090bf17 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -65,7 +65,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) tagGenerator("sgen"), dtxSelected(false), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), - userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))) + userID(getSession().getConnection().getUserId()) { acl = getSession().getBroker().getAcl(); } @@ -302,6 +302,18 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) return !blocked; } +namespace { +struct ConsumerName { + const SemanticState::ConsumerImpl& consumer; + ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} +}; + +ostream& operator<<(ostream& o, const ConsumerName& pc) { + return o << pc.consumer.getName() << " on " + << pc.consumer.getParent().getSession().getSessionId(); +} +} + void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { uint32_t originalMsgCredit = msgCredit; @@ -312,7 +324,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) if (byteCredit != 0xFFFFFFFF) { byteCredit -= msg->getRequiredCredit(); } - QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent + QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); @@ -320,15 +332,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { - if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent - << ", bytes: " << byteCredit << " msgs: " << msgCredit); - return false; - } else { - QPID_LOG(debug, "Credit available for '" << name << "' on " << parent - << " bytes: " << byteCredit << " msgs: " << msgCredit); - return true; - } + bool enoughCredit = msgCredit > 0 && + (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); + QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ") + << ConsumerName(*this) + << ", have bytes: " << byteCredit << " msgs: " << msgCredit + << ", need " << msg->getRequiredCredit() << " bytes"); + return enoughCredit; } SemanticState::ConsumerImpl::~ConsumerImpl() {} @@ -356,6 +366,9 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { } else { DeliverableMessage deliverable(msg); route(msg, deliverable); + if (msg->checkContentReleasable()) { + msg->releaseContent(); + } } } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index da8383fc12..89fe7b83dd 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -129,6 +129,7 @@ class SemanticState : private boost::noncopyable { const framing::FieldTable& getArguments() const { return arguments; } SemanticState& getParent() { return *parent; } + const SemanticState& getParent() const { return *parent; } }; private: @@ -163,6 +164,7 @@ class SemanticState : private boost::noncopyable { ~SemanticState(); SessionContext& getSession() { return session; } + const SessionContext& getSession() const { return session; } ConsumerImpl& find(const std::string& destination); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index a1ad5a0a30..2ac6d66e62 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -337,6 +337,10 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); } @@ -472,8 +476,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, AclModule* acl = getBroker().getAcl(); if (acl) - { - // add flags as needed + { if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) ) throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId())); } diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index cfdbd100c3..afbbb2cc22 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -28,7 +28,7 @@ #include "qpid/sys/OutputControl.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/OwnershipToken.h" - +#include "qpid/SessionId.h" #include <boost/noncopyable.hpp> @@ -45,6 +45,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; + virtual const SessionId& getSessionId() const = 0; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 67fd4f4f38..eade93ddaa 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -118,6 +118,8 @@ class SessionState : public qpid::SessionState, bool processSendCredit(uint32_t msgs); + const SessionId& getSessionId() const { return getId(); } + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); diff --git a/qpid/cpp/src/qpid/broker/SignalHandler.cpp b/qpid/cpp/src/qpid/broker/SignalHandler.cpp index f4a3822554..b565cfd419 100644 --- a/qpid/cpp/src/qpid/broker/SignalHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SignalHandler.cpp @@ -38,6 +38,8 @@ void SignalHandler::setBroker(const boost::intrusive_ptr<Broker>& b) { signal(SIGCHLD,SIG_IGN); } +void SignalHandler::shutdown() { shutdownHandler(0); } + void SignalHandler::shutdownHandler(int) { if (broker.get()) { broker->shutdown(); diff --git a/qpid/cpp/src/qpid/broker/SignalHandler.h b/qpid/cpp/src/qpid/broker/SignalHandler.h index d2cdfae07c..bbe831b61d 100644 --- a/qpid/cpp/src/qpid/broker/SignalHandler.h +++ b/qpid/cpp/src/qpid/broker/SignalHandler.h @@ -38,6 +38,9 @@ class SignalHandler /** Set the broker to be shutdown on signals */ static void setBroker(const boost::intrusive_ptr<Broker>& broker); + /** Initiate shut-down of broker */ + static void shutdown(); + private: static void shutdownHandler(int); static boost::intrusive_ptr<Broker> broker; diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index 6bf0b104ea..cb04742677 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -293,44 +293,23 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) return q != qv.end(); } -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ +void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +{ Binding::vector mb; + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); PreRoute pr(msg, this); - uint32_t count(0); - { - RWlock::ScopedRlock l(lock); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, routingKey)) { - Binding::vector& qv(i->second.bindingVector); - for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ - mb.push_back(*j); + RWlock::ScopedRlock l(lock); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, routingKey)) { + Binding::vector& qv(i->second.bindingVector); + for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){ + b->push_back(*j); + } } } } - } - - for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) { - msg.deliverTo((*j)->queue); - if ((*j)->mgmtBinding != 0) - (*j)->mgmtBinding->inc_msgMatched (); - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + doRoute(msg, b); } bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp index e47ac84990..928ac12c10 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.cpp +++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp @@ -88,7 +88,13 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw() void TxAccept::commit() throw() { - ops.commit(); + try { + ops.commit(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } } void TxAccept::rollback() throw() {} diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp index 17b99fd883..4b083033ea 100644 --- a/qpid/cpp/src/qpid/broker/TxPublish.cpp +++ b/qpid/cpp/src/qpid/broker/TxPublish.cpp @@ -26,9 +26,14 @@ using namespace qpid::broker; TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {} -bool TxPublish::prepare(TransactionContext* ctxt) throw(){ +bool TxPublish::prepare(TransactionContext* ctxt) throw() +{ try{ - for_each(queues.begin(), queues.end(), Prepare(ctxt, msg)); + while (!queues.empty()) { + prepare(ctxt, queues.front()); + prepared.push_back(queues.front()); + queues.pop_front(); + } return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -38,11 +43,30 @@ bool TxPublish::prepare(TransactionContext* ctxt) throw(){ return false; } -void TxPublish::commit() throw(){ - for_each(queues.begin(), queues.end(), Commit(msg)); +void TxPublish::commit() throw() +{ + try { + for_each(prepared.begin(), prepared.end(), Commit(msg)); + if (msg->checkContentReleasable()) { + msg->releaseContent(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } } -void TxPublish::rollback() throw(){ +void TxPublish::rollback() throw() +{ + try { + for_each(prepared.begin(), prepared.end(), Rollback(msg)); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to complete rollback: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to complete rollback (unknown error)"); + } + } void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ @@ -54,16 +78,14 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ } } -TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg) - : ctxt(_ctxt), msg(_msg){} - -void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){ +void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue) +{ if (!queue->enqueue(ctxt, msg)){ /** - * if not store then mark message for ack and deleivery once - * commit happens, as async IO will never set it when no store - * exists - */ + * if not store then mark message for ack and deleivery once + * commit happens, as async IO will never set it when no store + * exists + */ msg->enqueueComplete(); } } @@ -74,6 +96,12 @@ void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){ queue->process(msg); } +TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){} + +void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){ + queue->enqueueAborted(msg); +} + uint64_t TxPublish::contentSize () { return msg->contentSize (); diff --git a/qpid/cpp/src/qpid/broker/TxPublish.h b/qpid/cpp/src/qpid/broker/TxPublish.h index d5cf5639c4..b6ab9767ab 100644 --- a/qpid/cpp/src/qpid/broker/TxPublish.h +++ b/qpid/cpp/src/qpid/broker/TxPublish.h @@ -47,23 +47,25 @@ namespace qpid { * dispatch or to be added to the in-memory queue. */ class TxPublish : public TxOp, public Deliverable{ - class Prepare{ - TransactionContext* ctxt; + + class Commit{ boost::intrusive_ptr<Message>& msg; public: - Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg); + Commit(boost::intrusive_ptr<Message>& msg); void operator()(const boost::shared_ptr<Queue>& queue); }; - - class Commit{ + class Rollback{ boost::intrusive_ptr<Message>& msg; public: - Commit(boost::intrusive_ptr<Message>& msg); + Rollback(boost::intrusive_ptr<Message>& msg); void operator()(const boost::shared_ptr<Queue>& queue); }; boost::intrusive_ptr<Message> msg; std::list<Queue::shared_ptr> queues; + std::list<Queue::shared_ptr> prepared; + + void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>); public: QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg); |