diff options
30 files changed, 631 insertions, 435 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 08d2e7b1d9..6f36b6cb0c 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -132,6 +132,7 @@ libqpidcommon_la_SOURCES = \ qpid/Exception.cpp \ qpid/Plugin.cpp \ qpid/Url.cpp \ + qpid/sys/AggregateOutput.cpp \ qpid/sys/AsynchIOAcceptor.cpp \ qpid/sys/Dispatcher.cpp \ qpid/sys/Runnable.cpp \ @@ -408,6 +409,8 @@ nobase_include_HEADERS = \ qpid/sys/Module.h \ qpid/sys/Monitor.h \ qpid/sys/Mutex.h \ + qpid/sys/OutputControl.h \ + qpid/sys/OutputTask.h \ qpid/sys/Poller.h \ qpid/sys/Runnable.h \ qpid/sys/RefCountedMap.h \ diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp index 82378f938b..526b58cb14 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -302,9 +302,6 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, if(!nowait) getProxy().getBasic().consumeOk(newTag); - - //allow messages to be dispatched if required as there is now a consumer: - queue->requestDispatch(); } void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 26146e80d4..6a13c05242 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -44,6 +44,7 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : broker(broker_), + outputTasks(*out_), out(out_), framemax(65536), heartbeat(0), @@ -96,6 +97,11 @@ void Connection::closed(){ // Physically closed, suspend open sessions. } } +bool Connection::doOutput() +{ + return outputTasks.doOutput(); +} + void Connection::closeChannel(uint16_t id) { ChannelMap::iterator i = channels.find(id); if (i != channels.end()) channels.erase(i); diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 776634e04e..395aa7b0bd 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -29,6 +29,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/sys/AggregateOutput.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -70,6 +71,9 @@ class Connection : public sys::ConnectionInputHandler, Broker& broker; std::vector<Queue::shared_ptr> exclusiveQueues; + + //contained output tasks + sys::AggregateOutput outputTasks; // ConnectionInputHandler methods void received(framing::AMQFrame& frame); @@ -77,6 +81,7 @@ class Connection : public sys::ConnectionInputHandler, void idleOut(); void idleIn(); void closed(); + bool doOutput(); void closeChannel(framing::ChannelId channel); diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 5e09a00113..ed4bb176f6 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -53,7 +53,9 @@ namespace qpid { Consumer(bool preAcquires = true) : acquires(preAcquires) {} bool preAcquires() const { return acquires; } virtual bool deliver(QueuedMessage& msg) = 0; + virtual void notify() = 0; virtual bool filter(intrusive_ptr<Message>) { return true; } + virtual bool accept(intrusive_ptr<Message>) { return true; } virtual ~Consumer(){} }; } diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 834ce0a203..69cccf0ff0 100644 --- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -139,8 +139,6 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, string tag = destination; state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); - // Dispatch messages as there is now a consumer. - queue->requestDispatch(); } void diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp index c68cfcb52f..a00a623988 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp @@ -28,11 +28,11 @@ using namespace qpid::broker; void PersistableMessage::flush() { - sys::ScopedLock<sys::Mutex> l(storeLock); - if (store) { - for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { - store->flush(*(*i)); - } + sys::ScopedLock<sys::Mutex> l(storeLock); + if (store) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + store->flush(*(*i)); + } } } diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index 1a4ac6b891..299e22e2ba 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -43,7 +43,7 @@ class PersistableMessage : public Persistable { sys::Monitor asyncEnqueueLock; sys::Monitor asyncDequeueLock; - sys::Mutex storeLock; + sys::Mutex storeLock; /** * Tracks the number of outstanding asynchronous enqueue @@ -84,12 +84,12 @@ public: asyncEnqueueCounter(0), asyncDequeueCounter(0), store(0), - contentReleased(false) - {} + contentReleased(false) + {} void flush(); - inline bool isContentReleased()const {return contentReleased; } + inline bool isContentReleased()const {return contentReleased; } inline void waitForEnqueueComplete() { sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); @@ -104,27 +104,35 @@ public: } inline void enqueueComplete() { - sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); - if (asyncEnqueueCounter > 0) { - if (--asyncEnqueueCounter == 0) { - asyncEnqueueLock.notify(); - if (store) { - for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { - (*i)->notifyDurableIOComplete(); - } + bool notify = false; + { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + if (asyncEnqueueCounter > 0) { + if (--asyncEnqueueCounter == 0) { + asyncEnqueueLock.notify(); + notify = true; } } } + if (notify) { + sys::ScopedLock<sys::Mutex> l(storeLock); + if (store) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + (*i)->notifyDurableIOComplete(); + } + synclist.clear(); + } + } } inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) { - if (_store){ - sys::ScopedLock<sys::Mutex> l(storeLock); - store = _store; - synclist.push_back(queue); - } - enqueueAsync(); - } + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + store = _store; + synclist.push_back(queue); + } + enqueueAsync(); + } inline void enqueueAsync() { sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); @@ -154,13 +162,13 @@ public: } inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) { - if (_store){ + if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; - synclist.push_back(queue); - } - dequeueAsync(); - } + synclist.push_back(queue); + } + dequeueAsync(); + } inline void dequeueAsync() { sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index c43ab8c231..4dba60cd0d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -31,7 +31,8 @@ #include <iostream> #include <boost/bind.hpp> #include "QueueRegistry.h" - +#include <algorithm> +#include <functional> using namespace qpid::broker; using namespace qpid::sys; @@ -40,6 +41,8 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +using std::for_each; +using std::mem_fun; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -50,10 +53,9 @@ Queue::Queue(const string& _name, bool _autodelete, autodelete(_autodelete), store(_store), owner(_owner), - next(0), - persistenceId(0), - serializer(false), - dispatchCallback(*this) + consumerCount(0), + exclusive(false), + persistenceId(0) { if (parent != 0) { @@ -73,9 +75,8 @@ Queue::~Queue() void Queue::notifyDurableIOComplete() { - // signal SemanticHander to ack completed dequeues - // then dispatch to ack... - serializer.execute(dispatchCallback); + Mutex::ScopedLock locker(messageLock); + notify(); } @@ -110,7 +111,6 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); - serializer.execute(dispatchCallback); } } @@ -148,17 +148,13 @@ void Queue::process(intrusive_ptr<Message>& msg){ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } } - serializer.execute(dispatchCallback); - } void Queue::requeue(const QueuedMessage& msg){ - { - Mutex::ScopedLock locker(messageLock); - msg.payload->enqueueComplete(); // mark the message as enqueued - messages.push_front(msg); - } - serializer.execute(dispatchCallback); + Mutex::ScopedLock locker(messageLock); + msg.payload->enqueueComplete(); // mark the message as enqueued + messages.push_front(msg); + notify(); } bool Queue::acquire(const QueuedMessage& msg) { @@ -172,186 +168,170 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -void Queue::requestDispatch(Consumer::ptr c){ - if (!c || c->preAcquires()) { - serializer.execute(dispatchCallback); - } else { - DispatchFunctor f(*this, c); - serializer.execute(f); - } -} - -void Queue::flush(DispatchCompletion& completion) -{ - DispatchFunctor f(*this, &completion); - serializer.execute(f); -} - /** * Return true if the message can be excluded. This is currently the - * case if the queue has an exclusive consumer that will never want - * the message, or if the queue is exclusive to a single connection - * and has a single consumer (covers the JMS topic case). + * case if the queue is exclusive and has an exclusive consumer that + * doesn't want the message or has a single consumer that doesn't want + * the message (covers the JMS topic case). */ -bool Queue::exclude(intrusive_ptr<Message> msg) +bool Queue::canExcludeUnwanted() +{ + Mutex::ScopedLock locker(consumerLock); + return hasExclusiveOwner() && (exclusive || consumerCount == 1); +} + + +bool Queue::getNextMessage(QueuedMessage& m, Consumer& c) { - RWlock::ScopedWlock locker(consumerLock); - if (exclusive) { - return !exclusive->filter(msg); - } else if (hasExclusiveOwner() && acquirers.size() == 1) { - return !acquirers[0]->filter(msg); + if (c.preAcquires()) { + return consumeNextMessage(m, c); } else { - return false; + return browseNextMessage(m, c); } } -Consumer::ptr Queue::allocate() +bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) { - RWlock::ScopedWlock locker(consumerLock); - - if (acquirers.empty()) { - return Consumer::ptr(); - } else if (exclusive){ - return exclusive; - } else { - next = next % acquirers.size(); - return acquirers[next++]; + while (true) { + Mutex::ScopedLock locker(messageLock); + if (messages.empty()) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + addListener(c); + return false; + } else { + QueuedMessage msg = messages.front(); + if (!msg.payload->isEnqueueComplete()) { + QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'"); + addListener(c); + return false; + } + + if (c.filter(msg.payload)) { + if (c.accept(msg.payload)) { + m = msg; + pop(); + return true; + } else { + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + return false; + } + } else { + //consumer will never want this message + if (canExcludeUnwanted()) { + //hack for no-local on JMS topics; get rid of this message + QPID_LOG(debug, "Excluding message from '" << name << "'"); + pop(); + } else { + //leave it for another consumer + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + return false; + } + } + } } } -bool Queue::dispatch(QueuedMessage& msg) + +bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) { - QPID_LOG(info, "Dispatch message " << msg.position << " from queue " << name); - //additions to the acquirers will result in a separate dispatch - //request, so won't result in anyone being missed - uint counter = getAcquirerCount(); - Consumer::ptr c = allocate(); - while (c && counter--){ - if (c->deliver(msg)) { - return true; + QueuedMessage msg(this); + while (seek(msg, c)) { + if (c.filter(msg.payload)) { + if (c.accept(msg.payload)) { + //consumer wants the message + c.position = msg.position; + m = msg; + return true; + } else { + //consumer hasn't got enough credit for the message + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + return false; + } } else { - c = allocate(); + //consumer will never want this message, continue seeking + c.position = msg.position; + QPID_LOG(debug, "Browser skipping message from '" << name << "'"); } } return false; } -bool Queue::getNextMessage(QueuedMessage& msg) +void Queue::notify() { - Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - return false; - } else { - msg = messages.front(); - return true; - } + //notify listeners that there may be messages to process + for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify)); + listeners.clear(); } -void Queue::dispatch() +void Queue::removeListener(Consumer& c) { - QueuedMessage msg(this); - while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ - if (dispatch(msg)) { - pop(); - } else if (exclude(msg.payload)) { - pop(); - dequeue(0, msg.payload); - QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]"); - } else { - break; - } - } - serviceAllBrowsers(); -} - -void Queue::serviceAllBrowsers() + Mutex::ScopedLock locker(messageLock); + listeners.erase(&c); +} + +void Queue::addListener(Consumer& c) { - Consumers copy; - { - RWlock::ScopedRlock locker(consumerLock); - if (browsers.empty()) return;//shortcut - copy = browsers; - } - for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) { - serviceBrowser(*i); - } -} - -void Queue::serviceBrowser(Consumer::ptr browser) + listeners.insert(&c); +} + +bool Queue::dispatch(Consumer& c) { QueuedMessage msg(this); - while (seek(msg, browser->position) && browser->deliver(msg)) { - browser->position = msg.position; + if (getNextMessage(msg, c)) { + c.deliver(msg); + return true; + } else { + return false; } } -bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { +bool Queue::seek(QueuedMessage& msg, Consumer& c) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > position) { - if (position < messages.front().position) { + if (!messages.empty() && messages.back().position > c.position) { + if (c.position < messages.front().position) { msg = messages.front(); return true; } else { - uint index = (position - messages.front().position) + 1; + uint index = (c.position - messages.front().position) + 1; if (index < messages.size()) { msg = messages[index]; return true; } } } + addListener(c); return false; } -void Queue::consume(Consumer::ptr c, bool requestExclusive){ - RWlock::ScopedWlock locker(consumerLock); +void Queue::consume(Consumer&, bool requestExclusive){ + Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw AccessRefusedException( QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } - if(requestExclusive) { - if(acquirers.empty() && browsers.empty()) { - exclusive = c; - } else { + } else if(requestExclusive) { + if(consumerCount) { throw AccessRefusedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } - } - if (c->preAcquires()) { - acquirers.push_back(c); - } else { - Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { - c->position = SequenceNumber(sequence.getValue() - 1); } else { - c->position = SequenceNumber(messages.front().position.getValue() - 1); + exclusive = true; } - browsers.push_back(c); } + consumerCount++; if (mgmtObject != 0){ mgmtObject->inc_consumers (); } } -void Queue::cancel(Consumer::ptr c){ - RWlock::ScopedWlock locker(consumerLock); - if (c->preAcquires()) { - cancel(c, acquirers); - } else { - cancel(c, browsers); - } +void Queue::cancel(Consumer& c){ + removeListener(c); + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = false; if (mgmtObject != 0){ mgmtObject->dec_consumers (); } - if(exclusive == c) exclusive.reset(); -} - -void Queue::cancel(Consumer::ptr c, Consumers& consumers) -{ - Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); - if (i != consumers.end()) - consumers.erase(i); } QueuedMessage Queue::dequeue(){ @@ -382,14 +362,16 @@ uint32_t Queue::purge(){ return count; } +/** + * Assumes messageLock is held + */ void Queue::pop(){ - Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); messages.pop_front(); } void Queue::push(intrusive_ptr<Message>& msg){ - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); messages.push_back(QueuedMessage(this, msg, ++sequence)); if (policy.get()) { policy->enqueued(msg->contentSize()); @@ -397,6 +379,7 @@ void Queue::push(intrusive_ptr<Message>& msg){ msg->releaseContent(store); } } + notify(); } /** function only provided for unit tests, or code not in critical message path */ @@ -412,18 +395,13 @@ uint32_t Queue::getMessageCount() const{ } uint32_t Queue::getConsumerCount() const{ - RWlock::ScopedRlock locker(consumerLock); - return acquirers.size() + browsers.size(); -} - -uint32_t Queue::getAcquirerCount() const{ - RWlock::ScopedRlock locker(consumerLock); - return acquirers.size(); + Mutex::ScopedLock locker(consumerLock); + return consumerCount; } bool Queue::canAutoDelete() const{ - RWlock::ScopedRlock locker(consumerLock); - return autodelete && acquirers.empty() && browsers.empty(); + Mutex::ScopedLock locker(consumerLock); + return autodelete && !consumerCount; } // return true if store exists, @@ -601,21 +579,6 @@ bool Queue::hasExclusiveConsumer() const return exclusive; } -void Queue::DispatchFunctor::operator()() -{ - try { - if (consumer && !consumer->preAcquires()) { - queue.serviceBrowser(consumer); - }else{ - queue.dispatch(); - } - } catch (const std::exception& e) { - QPID_LOG(error, "Exception on dispatch: " << e.what()); - } - - if (sync) sync->completed(); -} - ManagementObject::shared_ptr Queue::GetManagementObject (void) const { return dynamic_pointer_cast<ManagementObject> (mgmtObject); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 1e56f1b6e9..4018f91367 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -24,6 +24,7 @@ #include <vector> #include <memory> #include <deque> +#include <set> #include <boost/shared_ptr.hpp> #include "qpid/framing/amqp_types.h" #include "ConnectionToken.h" @@ -48,12 +49,6 @@ namespace qpid { using std::string; - struct DispatchCompletion - { - virtual ~DispatchCompletion() {} - virtual void completed() = 0; - }; - /** * The brokers representation of an amqp queue. Messages are * delivered to a queue from where they can be dispatched to @@ -61,59 +56,40 @@ namespace qpid { * or more consumers registers. */ class Queue : public PersistableQueue, public management::Manageable { - typedef std::vector<Consumer::ptr> Consumers; + typedef std::set<Consumer*> Listeners; typedef std::deque<QueuedMessage> Messages; - - struct DispatchFunctor - { - Queue& queue; - Consumer::ptr consumer; - DispatchCompletion* sync; - - DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {} - DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* s = 0) : queue(q), consumer(c), sync(s) {} - void operator()(); - }; const string name; const bool autodelete; MessageStore* const store; const ConnectionToken* owner; - Consumers acquirers; - Consumers browsers; + uint32_t consumerCount; + bool exclusive; + Listeners listeners; Messages messages; - int next; - mutable qpid::sys::RWlock consumerLock; + mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Mutex messageLock; mutable qpid::sys::Mutex ownershipLock; - Consumer::ptr exclusive; mutable uint64_t persistenceId; framing::FieldTable settings; std::auto_ptr<QueuePolicy> policy; QueueBindings bindings; boost::shared_ptr<Exchange> alternateExchange; - qpid::sys::Serializer<DispatchFunctor> serializer; - DispatchFunctor dispatchCallback; framing::SequenceNumber sequence; management::Queue::shared_ptr mgmtObject; void pop(); void push(intrusive_ptr<Message>& msg); - bool dispatch(QueuedMessage& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); - /** - * only called by serilizer - */ - void dispatch(); - void cancel(Consumer::ptr c, Consumers& set); - void serviceAllBrowsers(); - void serviceBrowser(Consumer::ptr c); - Consumer::ptr allocate(); - bool seek(QueuedMessage& msg, const framing::SequenceNumber& position); - uint32_t getAcquirerCount() const; - bool getNextMessage(QueuedMessage& msg); - bool exclude(intrusive_ptr<Message> msg); - + bool seek(QueuedMessage& msg, Consumer& position); + bool getNextMessage(QueuedMessage& msg, Consumer& c); + bool consumeNextMessage(QueuedMessage& msg, Consumer& c); + bool browseNextMessage(QueuedMessage& msg, Consumer& c); + bool canExcludeUnwanted(); + + void notify(); + void removeListener(Consumer&); + void addListener(Consumer&); public: virtual void notifyDurableIOComplete(); @@ -127,6 +103,8 @@ namespace qpid { Manageable* parent = 0); ~Queue(); + bool dispatch(Consumer&); + void create(const qpid::framing::FieldTable& settings); void configure(const qpid::framing::FieldTable& settings); void destroy(); @@ -156,16 +134,10 @@ namespace qpid { * Used during recovery to add stored messages back to the queue */ void recover(intrusive_ptr<Message>& msg); - /** - * Request dispatch any queued messages providing there are - * consumers for them. Only one thread can be dispatching - * at any time, so this call schedules the despatch based on - * the serilizer policy. - */ - void requestDispatch(Consumer::ptr c = Consumer::ptr()); - void flush(DispatchCompletion& callback); - void consume(Consumer::ptr c, bool exclusive = false); - void cancel(Consumer::ptr c); + + void consume(Consumer& c, bool exclusive = false); + void cancel(Consumer& c); + uint32_t purge(); uint32_t getMessageCount() const; uint32_t getConsumerCount() const; diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp index ba43b5ecba..768ea9ea08 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp @@ -93,7 +93,6 @@ void SemanticHandler::sendCompletion() { SequenceNumber mark = incoming.getMark(); SequenceNumberSet range = incoming.getRange(); - Mutex::ScopedLock l(outLock); session.getProxy().getExecution().complete(mark.getValue(), range); } @@ -128,7 +127,6 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) if (!invoker.wasHandled()) { throw NotImplementedException("Not implemented"); } else if (invoker.hasResult()) { - Mutex::ScopedLock l(outLock); session.getProxy().getExecution().result(id.getValue(), invoker.getResult()); } if (method->isSync()) { @@ -166,7 +164,6 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { - Mutex::ScopedLock l(outLock); SessionHandler* handler = session.getHandler(); if (handler) { uint32_t maxFrameSize = handler->getConnection().getFrameMax(); diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h index 1afcdaab76..52dfa4dcf9 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.h +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h @@ -61,7 +61,6 @@ class SemanticHandler : public DeliveryAdapter, // state? IncomingExecutionContext incoming; framing::Window outgoing; - sys::Mutex outLock; MessageBuilder msgBuilder; RangedOperation ackOp; @@ -93,6 +92,9 @@ public: void noop(); void result(uint32_t command, const std::string& data); void sync(); + + + SemanticState& getSemanticState() { return state; } }; }} diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index e790e087f0..76775d03d5 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -62,7 +62,8 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) tagGenerator("sgen"), dtxSelected(false), accumulatedAck(0), - flowActive(true) + flowActive(true), + outputTasks(ss) { outstanding.reset(); } @@ -70,7 +71,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) SemanticState::~SemanticState() { //cancel all consumers for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - cancel(i->second); + cancel(*i); } if (dtxBuffer.get()) { @@ -89,19 +90,19 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); - queue->consume(c, exclusive);//may throw exception - consumers[tagInOut] = c; + std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); + queue->consume(*c, exclusive);//may throw exception + outputTasks.addOutputTask(c.get()); + consumers.insert(tagInOut, c.release()); } void SemanticState::cancel(const string& tag){ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { - cancel(i->second); + cancel(*i); consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery - Mutex::ScopedLock locker(deliveryLock); for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag)); } @@ -232,7 +233,6 @@ void SemanticState::record(const DeliveryRecord& delivery) bool SemanticState::checkPrefetch(intrusive_ptr<Message>& msg) { - Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; @@ -254,37 +254,27 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ackExpected(ack), nolocal(_nolocal), acquire(_acquire), - blocked(false), + blocked(true), windowing(true), msgCredit(0), byteCredit(0) {} bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { - if (!parent->getSession().isAttached()) { - return false; - } - - if (nolocal && - &parent->getSession().getConnection() == msg.payload->getPublisher()) { - return false; - } else { - if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { - blocked = true; - } else { - blocked = false; - Mutex::ScopedLock locker(parent->deliveryLock); - - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); - if (windowing || ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); - } - if (acquire && !ackExpected) { - queue->dequeue(0, msg.payload); - } + if (parent->getSession().isAttached() && accept(msg.payload)) { + allocateCredit(msg.payload); + DeliveryId deliveryTag = + parent->deliveryAdapter.deliver(msg, token); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + } + if (acquire && !ackExpected) { + queue->dequeue(0, msg.payload); } - return !blocked; + return true; + } else { + QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent); + return false; } } @@ -294,35 +284,48 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) &parent->getSession().getConnection() == msg->getPublisher()); } +bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) +{ + //TODO: remove the now redundant checks (channel.flow & basic|message.qos removed): + blocked = !(filter(msg) && checkCredit(msg) && parent->flowActive && (!ackExpected || parent->checkPrefetch(msg))); + return !blocked; +} + +void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) +{ + uint32_t originalMsgCredit = msgCredit; + uint32_t originalByteCredit = byteCredit; + if (msgCredit != 0xFFFFFFFF) { + msgCredit--; + } + if (byteCredit != 0xFFFFFFFF) { + byteCredit -= msg->getRequiredCredit(); + } + QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent + << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit + << " now bytes: " << byteCredit << " msgs: " << msgCredit); + +} + bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { - Mutex::ScopedLock l(lock); if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent << ", bytes: " << byteCredit << " msgs: " << msgCredit); return false; } else { - uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; - - if (msgCredit != 0xFFFFFFFF) { - msgCredit--; - } - if (byteCredit != 0xFFFFFFFF) { - byteCredit -= msg->getRequiredCredit(); - } QPID_LOG(debug, "Credit available for '" << name << "' on " << parent - << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit - << " now bytes: " << byteCredit << " msgs: " << msgCredit); + << " bytes: " << byteCredit << " msgs: " << msgCredit); return true; } } SemanticState::ConsumerImpl::~ConsumerImpl() {} -void SemanticState::cancel(ConsumerImpl::shared_ptr c) +void SemanticState::cancel(ConsumerImpl& c) { - Queue::shared_ptr queue = c->getQueue(); + outputTasks.removeOutputTask(&c); + Queue::shared_ptr queue = c.getQueue(); if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { @@ -374,8 +377,6 @@ void SemanticState::ackRange(DeliveryId first, DeliveryId last) void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) { { - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - ack_iterator start = cumulative ? unacked.begin() : find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); ack_iterator end = start; @@ -417,14 +418,14 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) void SemanticState::requestDispatch() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - requestDispatch(i->second); + requestDispatch(*i); } } -void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c) +void SemanticState::requestDispatch(ConsumerImpl& c) { - if(c->isBlocked()) { - c->getQueue()->requestDispatch(c); + if(c.isBlocked()) { + c.doOutput(); } } @@ -433,14 +434,13 @@ void SemanticState::acknowledged(const DeliveryRecord& delivery) delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - i->second->acknowledged(delivery); + i->acknowledged(delivery); } } void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) { if (windowing) { - Mutex::ScopedLock l(lock); if (msgCredit != 0xFFFFFFFF) msgCredit++; if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); } @@ -448,8 +448,6 @@ void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) void SemanticState::recover(bool requeue) { - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - if(requeue){ outstanding.reset(); //take copy and clear unacked as requeue may result in redelivery to this session @@ -470,7 +468,6 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue { QueuedMessage msg = queue->dequeue(); if(msg.payload){ - Mutex::ScopedLock locker(deliveryLock); DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); @@ -483,13 +480,11 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { - Mutex::ScopedLock locker(deliveryLock); return deliveryAdapter.deliver(msg, token); } void SemanticState::flow(bool active) { - Mutex::ScopedLock locker(deliveryLock); bool requestDelivery(!flowActive && active); flowActive = active; if (requestDelivery) { @@ -499,50 +494,50 @@ void SemanticState::flow(bool active) } -SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) +SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) { ConsumerImplMap::iterator i = consumers.find(destination); if (i == consumers.end()) { throw NotFoundException(QPID_MSG("Unknown destination " << destination)); } else { - return i->second; + return *i; } } void SemanticState::setWindowMode(const std::string& destination) { - find(destination)->setWindowMode(); + find(destination).setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination)->setCreditMode(); + find(destination).setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - ConsumerImpl::shared_ptr c = find(destination); - c->addByteCredit(value); + ConsumerImpl& c = find(destination); + c.addByteCredit(value); requestDispatch(c); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - ConsumerImpl::shared_ptr c = find(destination); - c->addMessageCredit(value); + ConsumerImpl& c = find(destination); + c.addMessageCredit(value); requestDispatch(c); } void SemanticState::flush(const std::string& destination) { - find(destination)->flush(); + find(destination).flush(); } void SemanticState::stop(const std::string& destination) { - find(destination)->stop(); + find(destination).stop(); } void SemanticState::ConsumerImpl::setWindowMode() @@ -557,7 +552,6 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { - Mutex::ScopedLock l(lock); if (byteCredit != 0xFFFFFFFF) { byteCredit += value; } @@ -565,7 +559,6 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { - Mutex::ScopedLock l(lock); if (msgCredit != 0xFFFFFFFF) { msgCredit += value; } @@ -573,16 +566,12 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) void SemanticState::ConsumerImpl::flush() { - //need to prevent delivery after requestDispatch returns but - //before credit is reduced to zero - FlushCompletion completion(*this); - queue->flush(completion); - completion.wait(); + while(queue->dispatch(*this)); + stop(); } void SemanticState::ConsumerImpl::stop() { - Mutex::ScopedLock l(lock); msgCredit = 0; byteCredit = 0; } @@ -618,14 +607,12 @@ AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired) { - Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); for_each(range.start, range.end, AcquireFunctor(acquired)); } void SemanticState::release(DeliveryId first, DeliveryId last) { - Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); //release results in the message being added to the head so want //to release in reverse order to keep the original transfer order @@ -636,26 +623,22 @@ void SemanticState::release(DeliveryId first, DeliveryId last) void SemanticState::reject(DeliveryId first, DeliveryId last) { - Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); //need to remove the delivery records as well unacked.erase(range.start, range.end); } - -void SemanticState::FlushCompletion::wait() +bool SemanticState::ConsumerImpl::doOutput() { - Monitor::ScopedLock locker(lock); - while (!complete) lock.wait(); + //TODO: think through properly + return queue->dispatch(*this); } -void SemanticState::FlushCompletion::completed() +void SemanticState::ConsumerImpl::notify() { - Monitor::ScopedLock locker(lock); - consumer.stop(); - complete = true; - lock.notifyAll(); + //TODO: think through properly + parent->outputTasks.activateOutput(); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 8e039d554b..7fc6e4167c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -35,6 +35,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AccumulatedAck.h" #include "qpid/framing/Uuid.h" +#include "qpid/sys/AggregateOutput.h" #include "qpid/shared_ptr.h" #include <list> @@ -51,11 +52,11 @@ class SessionState; * attached to a channel or suspended. */ class SemanticState : public framing::FrameHandler::Chains, + public sys::OutputTask, private boost::noncopyable { - class ConsumerImpl : public Consumer + class ConsumerImpl : public Consumer, public sys::OutputTask { - sys::Mutex lock; SemanticState* const parent; const DeliveryToken::shared_ptr token; const string name; @@ -69,16 +70,17 @@ class SemanticState : public framing::FrameHandler::Chains, uint32_t byteCredit; bool checkCredit(intrusive_ptr<Message>& msg); + void allocateCredit(intrusive_ptr<Message>& msg); public: - typedef shared_ptr<ConsumerImpl> shared_ptr; - ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); bool deliver(QueuedMessage& msg); bool filter(intrusive_ptr<Message> msg); + bool accept(intrusive_ptr<Message> msg); + void notify(); void setWindowMode(); void setCreditMode(); @@ -89,20 +91,11 @@ class SemanticState : public framing::FrameHandler::Chains, void acknowledged(const DeliveryRecord&); Queue::shared_ptr getQueue() { return queue; } bool isBlocked() const { return blocked; } - }; - struct FlushCompletion : DispatchCompletion - { - sys::Monitor lock; - ConsumerImpl& consumer; - bool complete; - - FlushCompletion(ConsumerImpl& c) : consumer(c), complete(false) {} - void wait(); - void completed(); + bool doOutput(); }; - typedef std::map<std::string,ConsumerImpl::shared_ptr> ConsumerImplMap; + typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; SessionState& session; @@ -114,27 +107,26 @@ class SemanticState : public framing::FrameHandler::Chains, Prefetch outstanding; NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; - sys::Mutex deliveryLock; TxBuffer::shared_ptr txBuffer; DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; DtxBufferMap suspendedXids; framing::AccumulatedAck accumulatedAck; bool flowActive; - boost::shared_ptr<Exchange> cacheExchange; + sys::AggregateOutput outputTasks; void route(intrusive_ptr<Message> msg, Deliverable& strategy); void record(const DeliveryRecord& delivery); bool checkPrefetch(intrusive_ptr<Message>& msg); void checkDtxTimeout(); - ConsumerImpl::shared_ptr find(const std::string& destination); + ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); void acknowledged(const DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); - void requestDispatch(ConsumerImpl::shared_ptr); - void cancel(ConsumerImpl::shared_ptr); + void requestDispatch(ConsumerImpl&); + void cancel(ConsumerImpl&); public: SemanticState(DeliveryAdapter&, SessionState&); @@ -188,6 +180,8 @@ class SemanticState : public framing::FrameHandler::Chains, void release(DeliveryId first, DeliveryId last); void reject(DeliveryId first, DeliveryId last); void handle(intrusive_ptr<Message> msg); + + bool doOutput() { return outputTasks.doOutput(); } }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index a142af2e1a..bbdbccad7d 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -70,6 +70,7 @@ void SessionHandler::handleIn(AMQFrame& f) { QPID_MSG("Channel " << channel.get() << " is not open")); } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. + session->detach(); session.reset(); peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ @@ -81,14 +82,9 @@ void SessionHandler::handleIn(AMQFrame& f) { } void SessionHandler::handleOut(AMQFrame& f) { - ConditionalScopedLock<Semaphore> s(suspension); - if (s.lockAcquired() && session.get() && session->isAttached()) { - channel.handle(f); // Send it. - if (session->sent(f)) - peerSession.solicitAck(); - } else { - QPID_LOG(error, "Dropping frame as session is no longer attached to a channel: " << f); - } + channel.handle(f); // Send it. + if (session->sent(f)) + peerSession.solicitAck(); } void SessionHandler::assertAttached(const char* method) const { @@ -138,6 +134,7 @@ void SessionHandler::close() { assertAttached("close"); QPID_LOG(info, "Received session.close"); ignoring=false; + session->detach(); session.reset(); peerSession.closed(REPLY_SUCCESS, "ok"); assert(&connection.getChannel(channel.get()) == this); @@ -147,14 +144,15 @@ void SessionHandler::close() { void SessionHandler::closed(uint16_t replyCode, const string& replyText) { QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); ignoring=false; + session->detach(); session.reset(); } void SessionHandler::localSuspend() { - ScopedLock<Semaphore> s(suspension); if (session.get() && session->isAttached()) { session->detach(); connection.broker.getSessionManager().suspend(session); + session.reset(); } } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 08584ecd47..9a68ddb46f 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -27,8 +27,6 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelHandler.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Semaphore.h" #include <boost/noncopyable.hpp> @@ -95,7 +93,6 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, framing::AMQP_ClientProxy::Session peerSession; bool ignoring; std::auto_ptr<SessionState> session; - sys::Semaphore suspension; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 27658f2c84..bea1eaedcf 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -30,6 +30,7 @@ namespace qpid { namespace broker { using namespace framing; +using sys::Mutex; void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); } @@ -48,7 +49,8 @@ SessionState::SessionState( { // TODO aconway 2007-09-20: SessionManager may add plugin // handlers to the chain. - } + getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); +} SessionState::~SessionState() { // Remove ID from active session list. @@ -70,11 +72,28 @@ Connection& SessionState::getConnection() { } void SessionState::detach() { + getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + Mutex::ScopedLock l(lock); handler = 0; } void SessionState::attach(SessionHandler& h) { - handler = &h; + { + Mutex::ScopedLock l(lock); + handler = &h; + } + h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); +} + +void SessionState::activateOutput() +{ + Mutex::ScopedLock l(lock); + if (isAttached()) { + getConnection().outputTasks.activateOutput(); + } } + //This class could be used as the callback for queue notifications + //if not attached, it can simply ignore the callback, else pass it + //on to the connection }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index d710079cd4..ac2a33442a 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -26,6 +26,8 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SessionState.h" #include "qpid/framing/ProtocolVersion.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/OutputControl.h" #include "qpid/sys/Time.h" #include <boost/noncopyable.hpp> @@ -54,7 +56,8 @@ class Connection; * themselves have state. */ class SessionState : public framing::SessionState, - public framing::FrameHandler::InOutHandler + public framing::FrameHandler::InOutHandler, + public sys::OutputControl { public: ~SessionState(); @@ -76,6 +79,9 @@ class SessionState : public framing::SessionState, Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } + /** OutputControl **/ + void activateOutput(); + protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); @@ -94,7 +100,7 @@ class SessionState : public framing::SessionState, sys::AbsTime expiry; // Used by SessionManager. Broker& broker; framing::ProtocolVersion version; - + sys::Mutex lock; boost::scoped_ptr<SemanticHandler> semanticHandler; friend class SessionManager; diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index 497288bc3f..f32d0470ec 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -106,7 +106,7 @@ OutputHandler* Connector::getOutputHandler(){ void Connector::send(AMQFrame& frame){ Mutex::ScopedLock l(writeLock); writeFrameQueue.push(frame); - aio->queueWrite(); + aio->notifyPendingWrite(); QPID_LOG(trace, "SENT [" << this << "]: " << frame); } diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp new file mode 100644 index 0000000000..74eea5ed08 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AggregateOutput.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace sys { + +void AggregateOutput::activateOutput() +{ + control.activateOutput(); +} + +bool AggregateOutput::doOutput() +{ + bool result = false; + if (!tasks.empty()) { + if (next >= tasks.size()) next = next % tasks.size(); + + size_t start = next; + //loop until a task generated some output + while (!result) { + result = tasks[next++]->doOutput(); + if (next >= tasks.size()) next = next % tasks.size(); + if (start == next) break; + } + } + return result; +} + +void AggregateOutput::addOutputTask(OutputTask* t) +{ + tasks.push_back(t); +} + +void AggregateOutput::removeOutputTask(OutputTask* t) +{ + TaskList::iterator i = find(tasks.begin(), tasks.end(), t); + if (i != tasks.end()) tasks.erase(i); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h new file mode 100644 index 0000000000..a870fcb95a --- /dev/null +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _AggregateOutput_ +#define _AggregateOutput_ + +#include <vector> +#include "Mutex.h" +#include "OutputControl.h" +#include "OutputTask.h" + +namespace qpid { +namespace sys { + + class AggregateOutput : public OutputTask, public OutputControl + { + typedef std::vector<OutputTask*> TaskList; + + TaskList tasks; + size_t next; + OutputControl& control; + + public: + AggregateOutput(OutputControl& c) : next(0), control(c) {}; + //this may be called on any thread + void activateOutput(); + //all the following will be called on the same thread + bool doOutput(); + void addOutputTask(OutputTask* t); + void removeOutputTask(OutputTask* t); + }; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index 7cb56b30aa..ca34d82741 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -97,6 +97,13 @@ private: std::deque<BufferBase*> bufferQueue; std::deque<BufferBase*> writeQueue; bool queuedClose; + /** + * This flag is used to detect and handle concurrency between + * calls to notifyPendingWrite() (which can be made from any thread) and + * the execution of the writeable() method (which is always on the + * thread processing this handle. + */ + volatile bool writePending; public: AsynchIO(const Socket& s, @@ -107,7 +114,8 @@ public: void start(Poller::shared_ptr poller); void queueReadBuffer(BufferBase* buff); void unread(BufferBase* buff); - void queueWrite(BufferBase* buff = 0); + void queueWrite(BufferBase* buff); + void notifyPendingWrite(); void queueWriteClose(); bool writeQueueEmpty() { return writeQueue.empty(); } BufferBase* getQueuedBuffer(); diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index bdf3e3b8d3..51ec7f718a 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -115,6 +115,7 @@ public: // Output side void send(framing::AMQFrame&); void close(); + void activateOutput(); // Input side void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); @@ -135,7 +136,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); async->init(aio, handler); @@ -195,7 +196,7 @@ void AsynchIOHandler::send(framing::AMQFrame& frame) { } // Activate aio for writing here - aio->queueWrite(); + aio->notifyPendingWrite(); } void AsynchIOHandler::close() { @@ -203,6 +204,10 @@ void AsynchIOHandler::close() { frameQueueClosed = true; } +void AsynchIOHandler::activateOutput() { + aio->notifyPendingWrite(); +} + // Input side void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { @@ -272,9 +277,11 @@ void AsynchIOHandler::idle(AsynchIO&){ ScopedLock<Mutex> l(frameQueueLock); if (frameQueue.empty()) { - // At this point we know that we're write idling the connection - // so we could note that somewhere or do something special - return; + // At this point we know that we're write idling the connection + // so tell the input handler to queue any available output: + inputHandler->doOutput(); + //if still no frames, theres nothing to do: + if (frameQueue.empty()) return; } do { diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h index 2bf3f66ec2..226096c5ef 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h +++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h @@ -24,6 +24,7 @@ #include "qpid/framing/InputHandler.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/framing/ProtocolInitiation.h" +#include "OutputTask.h" #include "TimeoutHandler.h" namespace qpid { @@ -32,7 +33,7 @@ namespace sys { class ConnectionInputHandler : public qpid::framing::InitiationHandler, public qpid::framing::InputHandler, - public TimeoutHandler + public TimeoutHandler, public OutputTask { public: virtual void closed() = 0; diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h index 8436bea599..5a60ae4998 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h +++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h @@ -22,6 +22,7 @@ #define _ConnectionOutputHandler_ #include "qpid/framing/OutputHandler.h" +#include "OutputControl.h" namespace qpid { namespace sys { @@ -29,7 +30,7 @@ namespace sys { /** * Provides the output handler associated with a connection. */ -class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler +class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl { public: virtual void close() = 0; diff --git a/qpid/cpp/src/qpid/sys/OutputControl.h b/qpid/cpp/src/qpid/sys/OutputControl.h new file mode 100644 index 0000000000..d922a0d85c --- /dev/null +++ b/qpid/cpp/src/qpid/sys/OutputControl.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _OutputControl_ +#define _OutputControl_ + +namespace qpid { +namespace sys { + + class OutputControl + { + public: + virtual ~OutputControl() {} + virtual void activateOutput() = 0; + }; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/sys/OutputTask.h b/qpid/cpp/src/qpid/sys/OutputTask.h new file mode 100644 index 0000000000..109765b8c3 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/OutputTask.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _OutputTask_ +#define _OutputTask_ + +namespace qpid { +namespace sys { + + class OutputTask + { + public: + virtual ~OutputTask() {} + virtual bool doOutput() = 0; + }; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 4600960c6d..e73bbc03ca 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -97,7 +97,8 @@ AsynchIO::AsynchIO(const Socket& s, closedCallback(cCb), emptyCallback(eCb), idleCallback(iCb), - queuedClose(false) { + queuedClose(false), + writePending(false) { s.setNonblocking(); } @@ -139,20 +140,21 @@ void AsynchIO::unread(BufferBase* buff) { DispatchHandle::rewatchRead(); } -// Either queue for writing or announce that there is something to write -// and we should ask for it void AsynchIO::queueWrite(BufferBase* buff) { - // If no buffer then don't queue anything - // (but still wake up for writing) - if (buff) { - // If we've already closed the socket then throw the write away - if (queuedClose) { - bufferQueue.push_front(buff); - return; - } else { - writeQueue.push_front(buff); - } - } + assert(buff); + // If we've already closed the socket then throw the write away + if (queuedClose) { + bufferQueue.push_front(buff); + return; + } else { + writeQueue.push_front(buff); + } + writePending = false; + DispatchHandle::rewatchWrite(); +} + +void AsynchIO::notifyPendingWrite() { + writePending = true; DispatchHandle::rewatchWrite(); } @@ -269,18 +271,24 @@ void AsynchIO::writeable(DispatchHandle& h) { } } } else { - // If we're waiting to close the socket then can do it now as there is nothing to write - if (queuedClose) { - close(h); - return; - } + // If we're waiting to close the socket then can do it now as there is nothing to write + if (queuedClose) { + close(h); + return; + } // Fd is writable, but nothing to write if (idleCallback) { + writePending = false; idleCallback(*this); } // If we still have no buffers to write we can't do anything more - if (writeQueue.empty() && !queuedClose) { + if (writeQueue.empty() && !writePending && !queuedClose) { h.unwatchWrite(); + //the following handles the case where writePending is + //set to true after the test above; in this case its + //possible that the unwatchWrite overwrites the + //desired rewatchWrite so we correct that here + if (writePending) h.rewatchWrite(); return; } } @@ -304,7 +312,7 @@ void AsynchIO::close(DispatchHandle& h) { h.stopWatch(); h.getSocket().close(); if (closedCallback) { - closedCallback(*this, getSocket()); + closedCallback(*this, getSocket()); } } diff --git a/qpid/cpp/src/tests/InProcessBroker.h b/qpid/cpp/src/tests/InProcessBroker.h index c893e6906a..9fa0135502 100644 --- a/qpid/cpp/src/tests/InProcessBroker.h +++ b/qpid/cpp/src/tests/InProcessBroker.h @@ -36,6 +36,7 @@ namespace qpid { +using qpid::sys::ConnectionInputHandler; /** * A client::Connector that connects directly to an in-process broker. @@ -54,13 +55,21 @@ class InProcessConnector : enum Sender {CLIENT,BROKER}; + struct Task { + AMQFrame frame; + bool doOutput; + + Task() : doOutput(true) {} + Task(AMQFrame& f) : frame(f), doOutput(false) {} + }; + /** Simulate the network thread of a peer with a queue and a thread. * With setInputHandler(0) drops frames simulating network packet loss. */ class NetworkQueue : public sys::Runnable { public: - NetworkQueue(const char* r) : inputHandler(0), receiver(r) { + NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) { thread=sys::Thread(this); } @@ -70,17 +79,24 @@ class InProcessConnector : } void push(AMQFrame& f) { queue.push(f); } + void activateOutput() { queue.push(Task()); } void run() { try { while(true) { - AMQFrame f = queue.pop(); - if (inputHandler) { - QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f)); - inputHandler->handle(f); + Task t = queue.pop(); + if (t.doOutput) { + if (connectionHandler) { + while (connectionHandler->doOutput()); + } + } else { + if (inputHandler) { + QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame)); + inputHandler->handle(t.frame); + } + else + QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame)); } - else - QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f)); } } catch (const ClosedException&) { @@ -88,16 +104,24 @@ class InProcessConnector : } } + void setConnectionInputHandler(ConnectionInputHandler* h) { + Lock l(lock); + inputHandler = h; + connectionHandler = h; + } + void setInputHandler(FrameHandler* h) { Lock l(lock); inputHandler = h; + connectionHandler = 0; } private: sys::Mutex lock; - sys::BlockingQueue<AMQFrame> queue; + sys::BlockingQueue<Task> queue; sys::Thread thread; FrameHandler* inputHandler; + ConnectionInputHandler* connectionHandler; const char* const receiver; }; @@ -105,11 +129,13 @@ class InProcessConnector : Sender from; NetworkQueue queue; const char* const sender; + NetworkQueue* reverseQueue; InProcessHandler(Sender s) : from(s), queue(from==CLIENT? "BROKER" : "CLIENT"), - sender(from==BROKER? "BROKER" : "CLIENT") + sender(from==BROKER? "BROKER" : "CLIENT"), + reverseQueue(0) {} ~InProcessHandler() { } @@ -123,6 +149,10 @@ class InProcessConnector : // Do not shut down the queue here, we may be in // the queue's dispatch thread. } + + void activateOutput() { + if (reverseQueue) reverseQueue->activateOutput(); + } }; InProcessConnector(shared_ptr<broker::Broker> b, @@ -135,7 +165,9 @@ class InProcessConnector : clientOut(CLIENT), isClosed(false) { - clientOut.queue.setInputHandler(&brokerConnection); + clientOut.queue.setConnectionInputHandler(&brokerConnection); + brokerOut.reverseQueue = &clientOut.queue; + clientOut.reverseQueue = &brokerOut.queue; } ~InProcessConnector() { @@ -169,7 +201,7 @@ class InProcessConnector : /** Sliently discard frames sent by either party, lost network traffic. */ void discard() { brokerOut.queue.setInputHandler(0); - clientOut.queue.setInputHandler(0); + clientOut.queue.setConnectionInputHandler(0); } private: diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 4714a998f6..7e757cfad0 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -47,6 +47,7 @@ public: received = true; return true; }; + void notify() {} }; class FailOnDeliver : public Deliverable @@ -88,7 +89,7 @@ class QueueTest : public CppUnit::TestCase Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr<Message> received; - TestConsumer::shared_ptr c1(new TestConsumer()); + TestConsumer c1; queue->consume(c1); @@ -98,7 +99,7 @@ class QueueTest : public CppUnit::TestCase queue->process(msg1); sleep(2); - CPPUNIT_ASSERT(!c1->received); + CPPUNIT_ASSERT(!c1.received); msg1->enqueueComplete(); received = queue->dequeue().payload; @@ -127,8 +128,8 @@ class QueueTest : public CppUnit::TestCase Queue::shared_ptr queue(new Queue("my_queue", true)); //Test adding consumers: - TestConsumer::shared_ptr c1(new TestConsumer()); - TestConsumer::shared_ptr c2(new TestConsumer()); + TestConsumer c1; + TestConsumer c2; queue->consume(c1); queue->consume(c2); @@ -140,20 +141,17 @@ class QueueTest : public CppUnit::TestCase intrusive_ptr<Message> msg3 = message("e", "C"); queue->deliver(msg1); - if (!c1->received) - sleep(2); - CPPUNIT_ASSERT_EQUAL(msg1.get(), c1->last.get()); + CPPUNIT_ASSERT(queue->dispatch(c1)); + CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); queue->deliver(msg2); - if (!c2->received) - sleep(2); - CPPUNIT_ASSERT_EQUAL(msg2.get(), c2->last.get()); + CPPUNIT_ASSERT(queue->dispatch(c2)); + CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); - c1->received = false; + c1.received = false; queue->deliver(msg3); - if (!c1->received) - sleep(2); - CPPUNIT_ASSERT_EQUAL(msg3.get(), c1->last.get()); + CPPUNIT_ASSERT(queue->dispatch(c1)); + CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); //Test cancellation: queue->cancel(c1); @@ -203,13 +201,13 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount()); - TestConsumer::shared_ptr consumer(new TestConsumer()); + TestConsumer consumer; queue->consume(consumer); - queue->requestDispatch(); - if (!consumer->received) + queue->dispatch(consumer); + if (!consumer.received) sleep(2); - CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer->last.get()); + CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->dequeue().payload; |