summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/Makefile.am3
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h2
-rw-r--r--qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h56
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp291
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h70
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp161
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h34
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp16
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h10
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp61
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.h54
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIO.h10
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp17
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionInputHandler.h3
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h3
-rw-r--r--qpid/cpp/src/qpid/sys/OutputControl.h38
-rw-r--r--qpid/cpp/src/qpid/sys/OutputTask.h38
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp50
-rw-r--r--qpid/cpp/src/tests/InProcessBroker.h54
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp34
30 files changed, 631 insertions, 435 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 08d2e7b1d9..6f36b6cb0c 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -132,6 +132,7 @@ libqpidcommon_la_SOURCES = \
qpid/Exception.cpp \
qpid/Plugin.cpp \
qpid/Url.cpp \
+ qpid/sys/AggregateOutput.cpp \
qpid/sys/AsynchIOAcceptor.cpp \
qpid/sys/Dispatcher.cpp \
qpid/sys/Runnable.cpp \
@@ -408,6 +409,8 @@ nobase_include_HEADERS = \
qpid/sys/Module.h \
qpid/sys/Monitor.h \
qpid/sys/Mutex.h \
+ qpid/sys/OutputControl.h \
+ qpid/sys/OutputTask.h \
qpid/sys/Poller.h \
qpid/sys/Runnable.h \
qpid/sys/RefCountedMap.h \
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
index 82378f938b..526b58cb14 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -302,9 +302,6 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
if(!nowait)
getProxy().getBasic().consumeOk(newTag);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->requestDispatch();
}
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 26146e80d4..6a13c05242 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -44,6 +44,7 @@ namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
broker(broker_),
+ outputTasks(*out_),
out(out_),
framemax(65536),
heartbeat(0),
@@ -96,6 +97,11 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
}
}
+bool Connection::doOutput()
+{
+ return outputTasks.doOutput();
+}
+
void Connection::closeChannel(uint16_t id) {
ChannelMap::iterator i = channels.find(id);
if (i != channels.end()) channels.erase(i);
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 776634e04e..395aa7b0bd 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -29,6 +29,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -70,6 +71,9 @@ class Connection : public sys::ConnectionInputHandler,
Broker& broker;
std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ //contained output tasks
+ sys::AggregateOutput outputTasks;
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
@@ -77,6 +81,7 @@ class Connection : public sys::ConnectionInputHandler,
void idleOut();
void idleIn();
void closed();
+ bool doOutput();
void closeChannel(framing::ChannelId channel);
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 5e09a00113..ed4bb176f6 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -53,7 +53,9 @@ namespace qpid {
Consumer(bool preAcquires = true) : acquires(preAcquires) {}
bool preAcquires() const { return acquires; }
virtual bool deliver(QueuedMessage& msg) = 0;
+ virtual void notify() = 0;
virtual bool filter(intrusive_ptr<Message>) { return true; }
+ virtual bool accept(intrusive_ptr<Message>) { return true; }
virtual ~Consumer(){}
};
}
diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 834ce0a203..69cccf0ff0 100644
--- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -139,8 +139,6 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
string tag = destination;
state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
- // Dispatch messages as there is now a consumer.
- queue->requestDispatch();
}
void
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
index c68cfcb52f..a00a623988 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -28,11 +28,11 @@ using namespace qpid::broker;
void PersistableMessage::flush()
{
- sys::ScopedLock<sys::Mutex> l(storeLock);
- if (store) {
- for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
- store->flush(*(*i));
- }
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ if (store) {
+ for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
+ store->flush(*(*i));
+ }
}
}
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index 1a4ac6b891..299e22e2ba 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -43,7 +43,7 @@ class PersistableMessage : public Persistable
{
sys::Monitor asyncEnqueueLock;
sys::Monitor asyncDequeueLock;
- sys::Mutex storeLock;
+ sys::Mutex storeLock;
/**
* Tracks the number of outstanding asynchronous enqueue
@@ -84,12 +84,12 @@ public:
asyncEnqueueCounter(0),
asyncDequeueCounter(0),
store(0),
- contentReleased(false)
- {}
+ contentReleased(false)
+ {}
void flush();
- inline bool isContentReleased()const {return contentReleased; }
+ inline bool isContentReleased()const {return contentReleased; }
inline void waitForEnqueueComplete() {
sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
@@ -104,27 +104,35 @@ public:
}
inline void enqueueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
- if (asyncEnqueueCounter > 0) {
- if (--asyncEnqueueCounter == 0) {
- asyncEnqueueLock.notify();
- if (store) {
- for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
- (*i)->notifyDurableIOComplete();
- }
+ bool notify = false;
+ {
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ if (asyncEnqueueCounter > 0) {
+ if (--asyncEnqueueCounter == 0) {
+ asyncEnqueueLock.notify();
+ notify = true;
}
}
}
+ if (notify) {
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ if (store) {
+ for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
+ (*i)->notifyDurableIOComplete();
+ }
+ synclist.clear();
+ }
+ }
}
inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) {
- if (_store){
- sys::ScopedLock<sys::Mutex> l(storeLock);
- store = _store;
- synclist.push_back(queue);
- }
- enqueueAsync();
- }
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ store = _store;
+ synclist.push_back(queue);
+ }
+ enqueueAsync();
+ }
inline void enqueueAsync() {
sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
@@ -154,13 +162,13 @@ public:
}
inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) {
- if (_store){
+ if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
- synclist.push_back(queue);
- }
- dequeueAsync();
- }
+ synclist.push_back(queue);
+ }
+ dequeueAsync();
+ }
inline void dequeueAsync() {
sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index c43ab8c231..4dba60cd0d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -31,7 +31,8 @@
#include <iostream>
#include <boost/bind.hpp>
#include "QueueRegistry.h"
-
+#include <algorithm>
+#include <functional>
using namespace qpid::broker;
using namespace qpid::sys;
@@ -40,6 +41,8 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using std::for_each;
+using std::mem_fun;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -50,10 +53,9 @@ Queue::Queue(const string& _name, bool _autodelete,
autodelete(_autodelete),
store(_store),
owner(_owner),
- next(0),
- persistenceId(0),
- serializer(false),
- dispatchCallback(*this)
+ consumerCount(0),
+ exclusive(false),
+ persistenceId(0)
{
if (parent != 0)
{
@@ -73,9 +75,8 @@ Queue::~Queue()
void Queue::notifyDurableIOComplete()
{
- // signal SemanticHander to ack completed dequeues
- // then dispatch to ack...
- serializer.execute(dispatchCallback);
+ Mutex::ScopedLock locker(messageLock);
+ notify();
}
@@ -110,7 +111,6 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
- serializer.execute(dispatchCallback);
}
}
@@ -148,17 +148,13 @@ void Queue::process(intrusive_ptr<Message>& msg){
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
}
- serializer.execute(dispatchCallback);
-
}
void Queue::requeue(const QueuedMessage& msg){
- {
- Mutex::ScopedLock locker(messageLock);
- msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.push_front(msg);
- }
- serializer.execute(dispatchCallback);
+ Mutex::ScopedLock locker(messageLock);
+ msg.payload->enqueueComplete(); // mark the message as enqueued
+ messages.push_front(msg);
+ notify();
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -172,186 +168,170 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-void Queue::requestDispatch(Consumer::ptr c){
- if (!c || c->preAcquires()) {
- serializer.execute(dispatchCallback);
- } else {
- DispatchFunctor f(*this, c);
- serializer.execute(f);
- }
-}
-
-void Queue::flush(DispatchCompletion& completion)
-{
- DispatchFunctor f(*this, &completion);
- serializer.execute(f);
-}
-
/**
* Return true if the message can be excluded. This is currently the
- * case if the queue has an exclusive consumer that will never want
- * the message, or if the queue is exclusive to a single connection
- * and has a single consumer (covers the JMS topic case).
+ * case if the queue is exclusive and has an exclusive consumer that
+ * doesn't want the message or has a single consumer that doesn't want
+ * the message (covers the JMS topic case).
*/
-bool Queue::exclude(intrusive_ptr<Message> msg)
+bool Queue::canExcludeUnwanted()
+{
+ Mutex::ScopedLock locker(consumerLock);
+ return hasExclusiveOwner() && (exclusive || consumerCount == 1);
+}
+
+
+bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
{
- RWlock::ScopedWlock locker(consumerLock);
- if (exclusive) {
- return !exclusive->filter(msg);
- } else if (hasExclusiveOwner() && acquirers.size() == 1) {
- return !acquirers[0]->filter(msg);
+ if (c.preAcquires()) {
+ return consumeNextMessage(m, c);
} else {
- return false;
+ return browseNextMessage(m, c);
}
}
-Consumer::ptr Queue::allocate()
+bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
{
- RWlock::ScopedWlock locker(consumerLock);
-
- if (acquirers.empty()) {
- return Consumer::ptr();
- } else if (exclusive){
- return exclusive;
- } else {
- next = next % acquirers.size();
- return acquirers[next++];
+ while (true) {
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ addListener(c);
+ return false;
+ } else {
+ QueuedMessage msg = messages.front();
+ if (!msg.payload->isEnqueueComplete()) {
+ QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'");
+ addListener(c);
+ return false;
+ }
+
+ if (c.filter(msg.payload)) {
+ if (c.accept(msg.payload)) {
+ m = msg;
+ pop();
+ return true;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ return false;
+ }
+ } else {
+ //consumer will never want this message
+ if (canExcludeUnwanted()) {
+ //hack for no-local on JMS topics; get rid of this message
+ QPID_LOG(debug, "Excluding message from '" << name << "'");
+ pop();
+ } else {
+ //leave it for another consumer
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ return false;
+ }
+ }
+ }
}
}
-bool Queue::dispatch(QueuedMessage& msg)
+
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
{
- QPID_LOG(info, "Dispatch message " << msg.position << " from queue " << name);
- //additions to the acquirers will result in a separate dispatch
- //request, so won't result in anyone being missed
- uint counter = getAcquirerCount();
- Consumer::ptr c = allocate();
- while (c && counter--){
- if (c->deliver(msg)) {
- return true;
+ QueuedMessage msg(this);
+ while (seek(msg, c)) {
+ if (c.filter(msg.payload)) {
+ if (c.accept(msg.payload)) {
+ //consumer wants the message
+ c.position = msg.position;
+ m = msg;
+ return true;
+ } else {
+ //consumer hasn't got enough credit for the message
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ return false;
+ }
} else {
- c = allocate();
+ //consumer will never want this message, continue seeking
+ c.position = msg.position;
+ QPID_LOG(debug, "Browser skipping message from '" << name << "'");
}
}
return false;
}
-bool Queue::getNextMessage(QueuedMessage& msg)
+void Queue::notify()
{
- Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- return false;
- } else {
- msg = messages.front();
- return true;
- }
+ //notify listeners that there may be messages to process
+ for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify));
+ listeners.clear();
}
-void Queue::dispatch()
+void Queue::removeListener(Consumer& c)
{
- QueuedMessage msg(this);
- while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
- if (dispatch(msg)) {
- pop();
- } else if (exclude(msg.payload)) {
- pop();
- dequeue(0, msg.payload);
- QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]");
- } else {
- break;
- }
- }
- serviceAllBrowsers();
-}
-
-void Queue::serviceAllBrowsers()
+ Mutex::ScopedLock locker(messageLock);
+ listeners.erase(&c);
+}
+
+void Queue::addListener(Consumer& c)
{
- Consumers copy;
- {
- RWlock::ScopedRlock locker(consumerLock);
- if (browsers.empty()) return;//shortcut
- copy = browsers;
- }
- for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) {
- serviceBrowser(*i);
- }
-}
-
-void Queue::serviceBrowser(Consumer::ptr browser)
+ listeners.insert(&c);
+}
+
+bool Queue::dispatch(Consumer& c)
{
QueuedMessage msg(this);
- while (seek(msg, browser->position) && browser->deliver(msg)) {
- browser->position = msg.position;
+ if (getNextMessage(msg, c)) {
+ c.deliver(msg);
+ return true;
+ } else {
+ return false;
}
}
-bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
+bool Queue::seek(QueuedMessage& msg, Consumer& c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > position) {
- if (position < messages.front().position) {
+ if (!messages.empty() && messages.back().position > c.position) {
+ if (c.position < messages.front().position) {
msg = messages.front();
return true;
} else {
- uint index = (position - messages.front().position) + 1;
+ uint index = (c.position - messages.front().position) + 1;
if (index < messages.size()) {
msg = messages[index];
return true;
}
}
}
+ addListener(c);
return false;
}
-void Queue::consume(Consumer::ptr c, bool requestExclusive){
- RWlock::ScopedWlock locker(consumerLock);
+void Queue::consume(Consumer&, bool requestExclusive){
+ Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw AccessRefusedException(
QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- }
- if(requestExclusive) {
- if(acquirers.empty() && browsers.empty()) {
- exclusive = c;
- } else {
+ } else if(requestExclusive) {
+ if(consumerCount) {
throw AccessRefusedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- }
- }
- if (c->preAcquires()) {
- acquirers.push_back(c);
- } else {
- Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
- c->position = SequenceNumber(sequence.getValue() - 1);
} else {
- c->position = SequenceNumber(messages.front().position.getValue() - 1);
+ exclusive = true;
}
- browsers.push_back(c);
}
+ consumerCount++;
if (mgmtObject != 0){
mgmtObject->inc_consumers ();
}
}
-void Queue::cancel(Consumer::ptr c){
- RWlock::ScopedWlock locker(consumerLock);
- if (c->preAcquires()) {
- cancel(c, acquirers);
- } else {
- cancel(c, browsers);
- }
+void Queue::cancel(Consumer& c){
+ removeListener(c);
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = false;
if (mgmtObject != 0){
mgmtObject->dec_consumers ();
}
- if(exclusive == c) exclusive.reset();
-}
-
-void Queue::cancel(Consumer::ptr c, Consumers& consumers)
-{
- Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
- if (i != consumers.end())
- consumers.erase(i);
}
QueuedMessage Queue::dequeue(){
@@ -382,14 +362,16 @@ uint32_t Queue::purge(){
return count;
}
+/**
+ * Assumes messageLock is held
+ */
void Queue::pop(){
- Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
messages.pop_front();
}
void Queue::push(intrusive_ptr<Message>& msg){
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
messages.push_back(QueuedMessage(this, msg, ++sequence));
if (policy.get()) {
policy->enqueued(msg->contentSize());
@@ -397,6 +379,7 @@ void Queue::push(intrusive_ptr<Message>& msg){
msg->releaseContent(store);
}
}
+ notify();
}
/** function only provided for unit tests, or code not in critical message path */
@@ -412,18 +395,13 @@ uint32_t Queue::getMessageCount() const{
}
uint32_t Queue::getConsumerCount() const{
- RWlock::ScopedRlock locker(consumerLock);
- return acquirers.size() + browsers.size();
-}
-
-uint32_t Queue::getAcquirerCount() const{
- RWlock::ScopedRlock locker(consumerLock);
- return acquirers.size();
+ Mutex::ScopedLock locker(consumerLock);
+ return consumerCount;
}
bool Queue::canAutoDelete() const{
- RWlock::ScopedRlock locker(consumerLock);
- return autodelete && acquirers.empty() && browsers.empty();
+ Mutex::ScopedLock locker(consumerLock);
+ return autodelete && !consumerCount;
}
// return true if store exists,
@@ -601,21 +579,6 @@ bool Queue::hasExclusiveConsumer() const
return exclusive;
}
-void Queue::DispatchFunctor::operator()()
-{
- try {
- if (consumer && !consumer->preAcquires()) {
- queue.serviceBrowser(consumer);
- }else{
- queue.dispatch();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception on dispatch: " << e.what());
- }
-
- if (sync) sync->completed();
-}
-
ManagementObject::shared_ptr Queue::GetManagementObject (void) const
{
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 1e56f1b6e9..4018f91367 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -24,6 +24,7 @@
#include <vector>
#include <memory>
#include <deque>
+#include <set>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_types.h"
#include "ConnectionToken.h"
@@ -48,12 +49,6 @@ namespace qpid {
using std::string;
- struct DispatchCompletion
- {
- virtual ~DispatchCompletion() {}
- virtual void completed() = 0;
- };
-
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
@@ -61,59 +56,40 @@ namespace qpid {
* or more consumers registers.
*/
class Queue : public PersistableQueue, public management::Manageable {
- typedef std::vector<Consumer::ptr> Consumers;
+ typedef std::set<Consumer*> Listeners;
typedef std::deque<QueuedMessage> Messages;
-
- struct DispatchFunctor
- {
- Queue& queue;
- Consumer::ptr consumer;
- DispatchCompletion* sync;
-
- DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {}
- DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* s = 0) : queue(q), consumer(c), sync(s) {}
- void operator()();
- };
const string name;
const bool autodelete;
MessageStore* const store;
const ConnectionToken* owner;
- Consumers acquirers;
- Consumers browsers;
+ uint32_t consumerCount;
+ bool exclusive;
+ Listeners listeners;
Messages messages;
- int next;
- mutable qpid::sys::RWlock consumerLock;
+ mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
- Consumer::ptr exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
QueueBindings bindings;
boost::shared_ptr<Exchange> alternateExchange;
- qpid::sys::Serializer<DispatchFunctor> serializer;
- DispatchFunctor dispatchCallback;
framing::SequenceNumber sequence;
management::Queue::shared_ptr mgmtObject;
void pop();
void push(intrusive_ptr<Message>& msg);
- bool dispatch(QueuedMessage& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
- /**
- * only called by serilizer
- */
- void dispatch();
- void cancel(Consumer::ptr c, Consumers& set);
- void serviceAllBrowsers();
- void serviceBrowser(Consumer::ptr c);
- Consumer::ptr allocate();
- bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
- uint32_t getAcquirerCount() const;
- bool getNextMessage(QueuedMessage& msg);
- bool exclude(intrusive_ptr<Message> msg);
-
+ bool seek(QueuedMessage& msg, Consumer& position);
+ bool getNextMessage(QueuedMessage& msg, Consumer& c);
+ bool consumeNextMessage(QueuedMessage& msg, Consumer& c);
+ bool browseNextMessage(QueuedMessage& msg, Consumer& c);
+ bool canExcludeUnwanted();
+
+ void notify();
+ void removeListener(Consumer&);
+ void addListener(Consumer&);
public:
virtual void notifyDurableIOComplete();
@@ -127,6 +103,8 @@ namespace qpid {
Manageable* parent = 0);
~Queue();
+ bool dispatch(Consumer&);
+
void create(const qpid::framing::FieldTable& settings);
void configure(const qpid::framing::FieldTable& settings);
void destroy();
@@ -156,16 +134,10 @@ namespace qpid {
* Used during recovery to add stored messages back to the queue
*/
void recover(intrusive_ptr<Message>& msg);
- /**
- * Request dispatch any queued messages providing there are
- * consumers for them. Only one thread can be dispatching
- * at any time, so this call schedules the despatch based on
- * the serilizer policy.
- */
- void requestDispatch(Consumer::ptr c = Consumer::ptr());
- void flush(DispatchCompletion& callback);
- void consume(Consumer::ptr c, bool exclusive = false);
- void cancel(Consumer::ptr c);
+
+ void consume(Consumer& c, bool exclusive = false);
+ void cancel(Consumer& c);
+
uint32_t purge();
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
index ba43b5ecba..768ea9ea08 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -93,7 +93,6 @@ void SemanticHandler::sendCompletion()
{
SequenceNumber mark = incoming.getMark();
SequenceNumberSet range = incoming.getRange();
- Mutex::ScopedLock l(outLock);
session.getProxy().getExecution().complete(mark.getValue(), range);
}
@@ -128,7 +127,6 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
if (!invoker.wasHandled()) {
throw NotImplementedException("Not implemented");
} else if (invoker.hasResult()) {
- Mutex::ScopedLock l(outLock);
session.getProxy().getExecution().result(id.getValue(), invoker.getResult());
}
if (method->isSync()) {
@@ -166,7 +164,6 @@ void SemanticHandler::handleContent(AMQFrame& frame)
DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
- Mutex::ScopedLock l(outLock);
SessionHandler* handler = session.getHandler();
if (handler) {
uint32_t maxFrameSize = handler->getConnection().getFrameMax();
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h
index 1afcdaab76..52dfa4dcf9 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.h
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h
@@ -61,7 +61,6 @@ class SemanticHandler : public DeliveryAdapter,
// state?
IncomingExecutionContext incoming;
framing::Window outgoing;
- sys::Mutex outLock;
MessageBuilder msgBuilder;
RangedOperation ackOp;
@@ -93,6 +92,9 @@ public:
void noop();
void result(uint32_t command, const std::string& data);
void sync();
+
+
+ SemanticState& getSemanticState() { return state; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index e790e087f0..76775d03d5 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -62,7 +62,8 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
tagGenerator("sgen"),
dtxSelected(false),
accumulatedAck(0),
- flowActive(true)
+ flowActive(true),
+ outputTasks(ss)
{
outstanding.reset();
}
@@ -70,7 +71,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
SemanticState::~SemanticState() {
//cancel all consumers
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- cancel(i->second);
+ cancel(*i);
}
if (dtxBuffer.get()) {
@@ -89,19 +90,19 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
- queue->consume(c, exclusive);//may throw exception
- consumers[tagInOut] = c;
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
+ queue->consume(*c, exclusive);//may throw exception
+ outputTasks.addOutputTask(c.get());
+ consumers.insert(tagInOut, c.release());
}
void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
- cancel(i->second);
+ cancel(*i);
consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
- Mutex::ScopedLock locker(deliveryLock);
for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag));
}
@@ -232,7 +233,6 @@ void SemanticState::record(const DeliveryRecord& delivery)
bool SemanticState::checkPrefetch(intrusive_ptr<Message>& msg)
{
- Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
@@ -254,37 +254,27 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
ackExpected(ack),
nolocal(_nolocal),
acquire(_acquire),
- blocked(false),
+ blocked(true),
windowing(true),
msgCredit(0),
byteCredit(0) {}
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (!parent->getSession().isAttached()) {
- return false;
- }
-
- if (nolocal &&
- &parent->getSession().getConnection() == msg.payload->getPublisher()) {
- return false;
- } else {
- if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
- blocked = true;
- } else {
- blocked = false;
- Mutex::ScopedLock locker(parent->deliveryLock);
-
- DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg, token);
- if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
- }
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg.payload);
- }
+ if (parent->getSession().isAttached() && accept(msg.payload)) {
+ allocateCredit(msg.payload);
+ DeliveryId deliveryTag =
+ parent->deliveryAdapter.deliver(msg, token);
+ if (windowing || ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
+ }
+ if (acquire && !ackExpected) {
+ queue->dequeue(0, msg.payload);
}
- return !blocked;
+ return true;
+ } else {
+ QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent);
+ return false;
}
}
@@ -294,35 +284,48 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
&parent->getSession().getConnection() == msg->getPublisher());
}
+bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
+{
+ //TODO: remove the now redundant checks (channel.flow & basic|message.qos removed):
+ blocked = !(filter(msg) && checkCredit(msg) && parent->flowActive && (!ackExpected || parent->checkPrefetch(msg)));
+ return !blocked;
+}
+
+void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
+{
+ uint32_t originalMsgCredit = msgCredit;
+ uint32_t originalByteCredit = byteCredit;
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit--;
+ }
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit -= msg->getRequiredCredit();
+ }
+ QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+ << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
+ << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+
+}
+
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- Mutex::ScopedLock l(lock);
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
<< ", bytes: " << byteCredit << " msgs: " << msgCredit);
return false;
} else {
- uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
-
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit--;
- }
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit -= msg->getRequiredCredit();
- }
QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
- << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
- << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+ << " bytes: " << byteCredit << " msgs: " << msgCredit);
return true;
}
}
SemanticState::ConsumerImpl::~ConsumerImpl() {}
-void SemanticState::cancel(ConsumerImpl::shared_ptr c)
+void SemanticState::cancel(ConsumerImpl& c)
{
- Queue::shared_ptr queue = c->getQueue();
+ outputTasks.removeOutputTask(&c);
+ Queue::shared_ptr queue = c.getQueue();
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
@@ -374,8 +377,6 @@ void SemanticState::ackRange(DeliveryId first, DeliveryId last)
void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
{
{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
ack_iterator start = cumulative ? unacked.begin() :
find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
ack_iterator end = start;
@@ -417,14 +418,14 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(i->second);
+ requestDispatch(*i);
}
}
-void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c)
+void SemanticState::requestDispatch(ConsumerImpl& c)
{
- if(c->isBlocked()) {
- c->getQueue()->requestDispatch(c);
+ if(c.isBlocked()) {
+ c.doOutput();
}
}
@@ -433,14 +434,13 @@ void SemanticState::acknowledged(const DeliveryRecord& delivery)
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- i->second->acknowledged(delivery);
+ i->acknowledged(delivery);
}
}
void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
{
if (windowing) {
- Mutex::ScopedLock l(lock);
if (msgCredit != 0xFFFFFFFF) msgCredit++;
if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
}
@@ -448,8 +448,6 @@ void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
void SemanticState::recover(bool requeue)
{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
if(requeue){
outstanding.reset();
//take copy and clear unacked as requeue may result in redelivery to this session
@@ -470,7 +468,6 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue
{
QueuedMessage msg = queue->dequeue();
if(msg.payload){
- Mutex::ScopedLock locker(deliveryLock);
DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
@@ -483,13 +480,11 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue
DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
- Mutex::ScopedLock locker(deliveryLock);
return deliveryAdapter.deliver(msg, token);
}
void SemanticState::flow(bool active)
{
- Mutex::ScopedLock locker(deliveryLock);
bool requestDelivery(!flowActive && active);
flowActive = active;
if (requestDelivery) {
@@ -499,50 +494,50 @@ void SemanticState::flow(bool active)
}
-SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination)
+SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
{
ConsumerImplMap::iterator i = consumers.find(destination);
if (i == consumers.end()) {
throw NotFoundException(QPID_MSG("Unknown destination " << destination));
} else {
- return i->second;
+ return *i;
}
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination)->setWindowMode();
+ find(destination).setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination)->setCreditMode();
+ find(destination).setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl::shared_ptr c = find(destination);
- c->addByteCredit(value);
+ ConsumerImpl& c = find(destination);
+ c.addByteCredit(value);
requestDispatch(c);
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl::shared_ptr c = find(destination);
- c->addMessageCredit(value);
+ ConsumerImpl& c = find(destination);
+ c.addMessageCredit(value);
requestDispatch(c);
}
void SemanticState::flush(const std::string& destination)
{
- find(destination)->flush();
+ find(destination).flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination)->stop();
+ find(destination).stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
@@ -557,7 +552,6 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
- Mutex::ScopedLock l(lock);
if (byteCredit != 0xFFFFFFFF) {
byteCredit += value;
}
@@ -565,7 +559,6 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
- Mutex::ScopedLock l(lock);
if (msgCredit != 0xFFFFFFFF) {
msgCredit += value;
}
@@ -573,16 +566,12 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
void SemanticState::ConsumerImpl::flush()
{
- //need to prevent delivery after requestDispatch returns but
- //before credit is reduced to zero
- FlushCompletion completion(*this);
- queue->flush(completion);
- completion.wait();
+ while(queue->dispatch(*this));
+ stop();
}
void SemanticState::ConsumerImpl::stop()
{
- Mutex::ScopedLock l(lock);
msgCredit = 0;
byteCredit = 0;
}
@@ -618,14 +607,12 @@ AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
{
- Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
for_each(range.start, range.end, AcquireFunctor(acquired));
}
void SemanticState::release(DeliveryId first, DeliveryId last)
{
- Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
//release results in the message being added to the head so want
//to release in reverse order to keep the original transfer order
@@ -636,26 +623,22 @@ void SemanticState::release(DeliveryId first, DeliveryId last)
void SemanticState::reject(DeliveryId first, DeliveryId last)
{
- Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
//need to remove the delivery records as well
unacked.erase(range.start, range.end);
}
-
-void SemanticState::FlushCompletion::wait()
+bool SemanticState::ConsumerImpl::doOutput()
{
- Monitor::ScopedLock locker(lock);
- while (!complete) lock.wait();
+ //TODO: think through properly
+ return queue->dispatch(*this);
}
-void SemanticState::FlushCompletion::completed()
+void SemanticState::ConsumerImpl::notify()
{
- Monitor::ScopedLock locker(lock);
- consumer.stop();
- complete = true;
- lock.notifyAll();
+ //TODO: think through properly
+ parent->outputTasks.activateOutput();
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 8e039d554b..7fc6e4167c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -35,6 +35,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AccumulatedAck.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/sys/AggregateOutput.h"
#include "qpid/shared_ptr.h"
#include <list>
@@ -51,11 +52,11 @@ class SessionState;
* attached to a channel or suspended.
*/
class SemanticState : public framing::FrameHandler::Chains,
+ public sys::OutputTask,
private boost::noncopyable
{
- class ConsumerImpl : public Consumer
+ class ConsumerImpl : public Consumer, public sys::OutputTask
{
- sys::Mutex lock;
SemanticState* const parent;
const DeliveryToken::shared_ptr token;
const string name;
@@ -69,16 +70,17 @@ class SemanticState : public framing::FrameHandler::Chains,
uint32_t byteCredit;
bool checkCredit(intrusive_ptr<Message>& msg);
+ void allocateCredit(intrusive_ptr<Message>& msg);
public:
- typedef shared_ptr<ConsumerImpl> shared_ptr;
-
ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
bool filter(intrusive_ptr<Message> msg);
+ bool accept(intrusive_ptr<Message> msg);
+ void notify();
void setWindowMode();
void setCreditMode();
@@ -89,20 +91,11 @@ class SemanticState : public framing::FrameHandler::Chains,
void acknowledged(const DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
bool isBlocked() const { return blocked; }
- };
- struct FlushCompletion : DispatchCompletion
- {
- sys::Monitor lock;
- ConsumerImpl& consumer;
- bool complete;
-
- FlushCompletion(ConsumerImpl& c) : consumer(c), complete(false) {}
- void wait();
- void completed();
+ bool doOutput();
};
- typedef std::map<std::string,ConsumerImpl::shared_ptr> ConsumerImplMap;
+ typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
SessionState& session;
@@ -114,27 +107,26 @@ class SemanticState : public framing::FrameHandler::Chains,
Prefetch outstanding;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- sys::Mutex deliveryLock;
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
DtxBufferMap suspendedXids;
framing::AccumulatedAck accumulatedAck;
bool flowActive;
-
boost::shared_ptr<Exchange> cacheExchange;
+ sys::AggregateOutput outputTasks;
void route(intrusive_ptr<Message> msg, Deliverable& strategy);
void record(const DeliveryRecord& delivery);
bool checkPrefetch(intrusive_ptr<Message>& msg);
void checkDtxTimeout();
- ConsumerImpl::shared_ptr find(const std::string& destination);
+ ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void acknowledged(const DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
- void requestDispatch(ConsumerImpl::shared_ptr);
- void cancel(ConsumerImpl::shared_ptr);
+ void requestDispatch(ConsumerImpl&);
+ void cancel(ConsumerImpl&);
public:
SemanticState(DeliveryAdapter&, SessionState&);
@@ -188,6 +180,8 @@ class SemanticState : public framing::FrameHandler::Chains,
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
void handle(intrusive_ptr<Message> msg);
+
+ bool doOutput() { return outputTasks.doOutput(); }
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index a142af2e1a..bbdbccad7d 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -70,6 +70,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
QPID_MSG("Channel " << channel.get() << " is not open"));
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
+ session->detach();
session.reset();
peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
@@ -81,14 +82,9 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
void SessionHandler::handleOut(AMQFrame& f) {
- ConditionalScopedLock<Semaphore> s(suspension);
- if (s.lockAcquired() && session.get() && session->isAttached()) {
- channel.handle(f); // Send it.
- if (session->sent(f))
- peerSession.solicitAck();
- } else {
- QPID_LOG(error, "Dropping frame as session is no longer attached to a channel: " << f);
- }
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
}
void SessionHandler::assertAttached(const char* method) const {
@@ -138,6 +134,7 @@ void SessionHandler::close() {
assertAttached("close");
QPID_LOG(info, "Received session.close");
ignoring=false;
+ session->detach();
session.reset();
peerSession.closed(REPLY_SUCCESS, "ok");
assert(&connection.getChannel(channel.get()) == this);
@@ -147,14 +144,15 @@ void SessionHandler::close() {
void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
ignoring=false;
+ session->detach();
session.reset();
}
void SessionHandler::localSuspend() {
- ScopedLock<Semaphore> s(suspension);
if (session.get() && session->isAttached()) {
session->detach();
connection.broker.getSessionManager().suspend(session);
+ session.reset();
}
}
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 08584ecd47..9a68ddb46f 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -27,8 +27,6 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelHandler.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Semaphore.h"
#include <boost/noncopyable.hpp>
@@ -95,7 +93,6 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
- sys::Semaphore suspension;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 27658f2c84..bea1eaedcf 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -30,6 +30,7 @@ namespace qpid {
namespace broker {
using namespace framing;
+using sys::Mutex;
void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
@@ -48,7 +49,8 @@ SessionState::SessionState(
{
// TODO aconway 2007-09-20: SessionManager may add plugin
// handlers to the chain.
- }
+ getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+}
SessionState::~SessionState() {
// Remove ID from active session list.
@@ -70,11 +72,28 @@ Connection& SessionState::getConnection() {
}
void SessionState::detach() {
+ getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+ Mutex::ScopedLock l(lock);
handler = 0;
}
void SessionState::attach(SessionHandler& h) {
- handler = &h;
+ {
+ Mutex::ScopedLock l(lock);
+ handler = &h;
+ }
+ h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+}
+
+void SessionState::activateOutput()
+{
+ Mutex::ScopedLock l(lock);
+ if (isAttached()) {
+ getConnection().outputTasks.activateOutput();
+ }
}
+ //This class could be used as the callback for queue notifications
+ //if not attached, it can simply ignore the callback, else pass it
+ //on to the connection
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index d710079cd4..ac2a33442a 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -26,6 +26,8 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SessionState.h"
#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/OutputControl.h"
#include "qpid/sys/Time.h"
#include <boost/noncopyable.hpp>
@@ -54,7 +56,8 @@ class Connection;
* themselves have state.
*/
class SessionState : public framing::SessionState,
- public framing::FrameHandler::InOutHandler
+ public framing::FrameHandler::InOutHandler,
+ public sys::OutputControl
{
public:
~SessionState();
@@ -76,6 +79,9 @@ class SessionState : public framing::SessionState,
Broker& getBroker() { return broker; }
framing::ProtocolVersion getVersion() const { return version; }
+ /** OutputControl **/
+ void activateOutput();
+
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
@@ -94,7 +100,7 @@ class SessionState : public framing::SessionState,
sys::AbsTime expiry; // Used by SessionManager.
Broker& broker;
framing::ProtocolVersion version;
-
+ sys::Mutex lock;
boost::scoped_ptr<SemanticHandler> semanticHandler;
friend class SessionManager;
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index 497288bc3f..f32d0470ec 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -106,7 +106,7 @@ OutputHandler* Connector::getOutputHandler(){
void Connector::send(AMQFrame& frame){
Mutex::ScopedLock l(writeLock);
writeFrameQueue.push(frame);
- aio->queueWrite();
+ aio->notifyPendingWrite();
QPID_LOG(trace, "SENT [" << this << "]: " << frame);
}
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
new file mode 100644
index 0000000000..74eea5ed08
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace sys {
+
+void AggregateOutput::activateOutput()
+{
+ control.activateOutput();
+}
+
+bool AggregateOutput::doOutput()
+{
+ bool result = false;
+ if (!tasks.empty()) {
+ if (next >= tasks.size()) next = next % tasks.size();
+
+ size_t start = next;
+ //loop until a task generated some output
+ while (!result) {
+ result = tasks[next++]->doOutput();
+ if (next >= tasks.size()) next = next % tasks.size();
+ if (start == next) break;
+ }
+ }
+ return result;
+}
+
+void AggregateOutput::addOutputTask(OutputTask* t)
+{
+ tasks.push_back(t);
+}
+
+void AggregateOutput::removeOutputTask(OutputTask* t)
+{
+ TaskList::iterator i = find(tasks.begin(), tasks.end(), t);
+ if (i != tasks.end()) tasks.erase(i);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h
new file mode 100644
index 0000000000..a870fcb95a
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _AggregateOutput_
+#define _AggregateOutput_
+
+#include <vector>
+#include "Mutex.h"
+#include "OutputControl.h"
+#include "OutputTask.h"
+
+namespace qpid {
+namespace sys {
+
+ class AggregateOutput : public OutputTask, public OutputControl
+ {
+ typedef std::vector<OutputTask*> TaskList;
+
+ TaskList tasks;
+ size_t next;
+ OutputControl& control;
+
+ public:
+ AggregateOutput(OutputControl& c) : next(0), control(c) {};
+ //this may be called on any thread
+ void activateOutput();
+ //all the following will be called on the same thread
+ bool doOutput();
+ void addOutputTask(OutputTask* t);
+ void removeOutputTask(OutputTask* t);
+ };
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h
index 7cb56b30aa..ca34d82741 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIO.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIO.h
@@ -97,6 +97,13 @@ private:
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
bool queuedClose;
+ /**
+ * This flag is used to detect and handle concurrency between
+ * calls to notifyPendingWrite() (which can be made from any thread) and
+ * the execution of the writeable() method (which is always on the
+ * thread processing this handle.
+ */
+ volatile bool writePending;
public:
AsynchIO(const Socket& s,
@@ -107,7 +114,8 @@ public:
void start(Poller::shared_ptr poller);
void queueReadBuffer(BufferBase* buff);
void unread(BufferBase* buff);
- void queueWrite(BufferBase* buff = 0);
+ void queueWrite(BufferBase* buff);
+ void notifyPendingWrite();
void queueWriteClose();
bool writeQueueEmpty() { return writeQueue.empty(); }
BufferBase* getQueuedBuffer();
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index bdf3e3b8d3..51ec7f718a 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -115,6 +115,7 @@ public:
// Output side
void send(framing::AMQFrame&);
void close();
+ void activateOutput();
// Input side
void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -135,7 +136,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, handler);
@@ -195,7 +196,7 @@ void AsynchIOHandler::send(framing::AMQFrame& frame) {
}
// Activate aio for writing here
- aio->queueWrite();
+ aio->notifyPendingWrite();
}
void AsynchIOHandler::close() {
@@ -203,6 +204,10 @@ void AsynchIOHandler::close() {
frameQueueClosed = true;
}
+void AsynchIOHandler::activateOutput() {
+ aio->notifyPendingWrite();
+}
+
// Input side
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
@@ -272,9 +277,11 @@ void AsynchIOHandler::idle(AsynchIO&){
ScopedLock<Mutex> l(frameQueueLock);
if (frameQueue.empty()) {
- // At this point we know that we're write idling the connection
- // so we could note that somewhere or do something special
- return;
+ // At this point we know that we're write idling the connection
+ // so tell the input handler to queue any available output:
+ inputHandler->doOutput();
+ //if still no frames, theres nothing to do:
+ if (frameQueue.empty()) return;
}
do {
diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
index 2bf3f66ec2..226096c5ef 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
@@ -24,6 +24,7 @@
#include "qpid/framing/InputHandler.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
+#include "OutputTask.h"
#include "TimeoutHandler.h"
namespace qpid {
@@ -32,7 +33,7 @@ namespace sys {
class ConnectionInputHandler :
public qpid::framing::InitiationHandler,
public qpid::framing::InputHandler,
- public TimeoutHandler
+ public TimeoutHandler, public OutputTask
{
public:
virtual void closed() = 0;
diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
index 8436bea599..5a60ae4998 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
@@ -22,6 +22,7 @@
#define _ConnectionOutputHandler_
#include "qpid/framing/OutputHandler.h"
+#include "OutputControl.h"
namespace qpid {
namespace sys {
@@ -29,7 +30,7 @@ namespace sys {
/**
* Provides the output handler associated with a connection.
*/
-class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl
{
public:
virtual void close() = 0;
diff --git a/qpid/cpp/src/qpid/sys/OutputControl.h b/qpid/cpp/src/qpid/sys/OutputControl.h
new file mode 100644
index 0000000000..d922a0d85c
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/OutputControl.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _OutputControl_
+#define _OutputControl_
+
+namespace qpid {
+namespace sys {
+
+ class OutputControl
+ {
+ public:
+ virtual ~OutputControl() {}
+ virtual void activateOutput() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/OutputTask.h b/qpid/cpp/src/qpid/sys/OutputTask.h
new file mode 100644
index 0000000000..109765b8c3
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/OutputTask.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _OutputTask_
+#define _OutputTask_
+
+namespace qpid {
+namespace sys {
+
+ class OutputTask
+ {
+ public:
+ virtual ~OutputTask() {}
+ virtual bool doOutput() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 4600960c6d..e73bbc03ca 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -97,7 +97,8 @@ AsynchIO::AsynchIO(const Socket& s,
closedCallback(cCb),
emptyCallback(eCb),
idleCallback(iCb),
- queuedClose(false) {
+ queuedClose(false),
+ writePending(false) {
s.setNonblocking();
}
@@ -139,20 +140,21 @@ void AsynchIO::unread(BufferBase* buff) {
DispatchHandle::rewatchRead();
}
-// Either queue for writing or announce that there is something to write
-// and we should ask for it
void AsynchIO::queueWrite(BufferBase* buff) {
- // If no buffer then don't queue anything
- // (but still wake up for writing)
- if (buff) {
- // If we've already closed the socket then throw the write away
- if (queuedClose) {
- bufferQueue.push_front(buff);
- return;
- } else {
- writeQueue.push_front(buff);
- }
- }
+ assert(buff);
+ // If we've already closed the socket then throw the write away
+ if (queuedClose) {
+ bufferQueue.push_front(buff);
+ return;
+ } else {
+ writeQueue.push_front(buff);
+ }
+ writePending = false;
+ DispatchHandle::rewatchWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+ writePending = true;
DispatchHandle::rewatchWrite();
}
@@ -269,18 +271,24 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
}
} else {
- // If we're waiting to close the socket then can do it now as there is nothing to write
- if (queuedClose) {
- close(h);
- return;
- }
+ // If we're waiting to close the socket then can do it now as there is nothing to write
+ if (queuedClose) {
+ close(h);
+ return;
+ }
// Fd is writable, but nothing to write
if (idleCallback) {
+ writePending = false;
idleCallback(*this);
}
// If we still have no buffers to write we can't do anything more
- if (writeQueue.empty() && !queuedClose) {
+ if (writeQueue.empty() && !writePending && !queuedClose) {
h.unwatchWrite();
+ //the following handles the case where writePending is
+ //set to true after the test above; in this case its
+ //possible that the unwatchWrite overwrites the
+ //desired rewatchWrite so we correct that here
+ if (writePending) h.rewatchWrite();
return;
}
}
@@ -304,7 +312,7 @@ void AsynchIO::close(DispatchHandle& h) {
h.stopWatch();
h.getSocket().close();
if (closedCallback) {
- closedCallback(*this, getSocket());
+ closedCallback(*this, getSocket());
}
}
diff --git a/qpid/cpp/src/tests/InProcessBroker.h b/qpid/cpp/src/tests/InProcessBroker.h
index c893e6906a..9fa0135502 100644
--- a/qpid/cpp/src/tests/InProcessBroker.h
+++ b/qpid/cpp/src/tests/InProcessBroker.h
@@ -36,6 +36,7 @@
namespace qpid {
+using qpid::sys::ConnectionInputHandler;
/**
* A client::Connector that connects directly to an in-process broker.
@@ -54,13 +55,21 @@ class InProcessConnector :
enum Sender {CLIENT,BROKER};
+ struct Task {
+ AMQFrame frame;
+ bool doOutput;
+
+ Task() : doOutput(true) {}
+ Task(AMQFrame& f) : frame(f), doOutput(false) {}
+ };
+
/** Simulate the network thread of a peer with a queue and a thread.
* With setInputHandler(0) drops frames simulating network packet loss.
*/
class NetworkQueue : public sys::Runnable
{
public:
- NetworkQueue(const char* r) : inputHandler(0), receiver(r) {
+ NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) {
thread=sys::Thread(this);
}
@@ -70,17 +79,24 @@ class InProcessConnector :
}
void push(AMQFrame& f) { queue.push(f); }
+ void activateOutput() { queue.push(Task()); }
void run() {
try {
while(true) {
- AMQFrame f = queue.pop();
- if (inputHandler) {
- QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
- inputHandler->handle(f);
+ Task t = queue.pop();
+ if (t.doOutput) {
+ if (connectionHandler) {
+ while (connectionHandler->doOutput());
+ }
+ } else {
+ if (inputHandler) {
+ QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame));
+ inputHandler->handle(t.frame);
+ }
+ else
+ QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame));
}
- else
- QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
}
}
catch (const ClosedException&) {
@@ -88,16 +104,24 @@ class InProcessConnector :
}
}
+ void setConnectionInputHandler(ConnectionInputHandler* h) {
+ Lock l(lock);
+ inputHandler = h;
+ connectionHandler = h;
+ }
+
void setInputHandler(FrameHandler* h) {
Lock l(lock);
inputHandler = h;
+ connectionHandler = 0;
}
private:
sys::Mutex lock;
- sys::BlockingQueue<AMQFrame> queue;
+ sys::BlockingQueue<Task> queue;
sys::Thread thread;
FrameHandler* inputHandler;
+ ConnectionInputHandler* connectionHandler;
const char* const receiver;
};
@@ -105,11 +129,13 @@ class InProcessConnector :
Sender from;
NetworkQueue queue;
const char* const sender;
+ NetworkQueue* reverseQueue;
InProcessHandler(Sender s)
: from(s),
queue(from==CLIENT? "BROKER" : "CLIENT"),
- sender(from==BROKER? "BROKER" : "CLIENT")
+ sender(from==BROKER? "BROKER" : "CLIENT"),
+ reverseQueue(0)
{}
~InProcessHandler() { }
@@ -123,6 +149,10 @@ class InProcessConnector :
// Do not shut down the queue here, we may be in
// the queue's dispatch thread.
}
+
+ void activateOutput() {
+ if (reverseQueue) reverseQueue->activateOutput();
+ }
};
InProcessConnector(shared_ptr<broker::Broker> b,
@@ -135,7 +165,9 @@ class InProcessConnector :
clientOut(CLIENT),
isClosed(false)
{
- clientOut.queue.setInputHandler(&brokerConnection);
+ clientOut.queue.setConnectionInputHandler(&brokerConnection);
+ brokerOut.reverseQueue = &clientOut.queue;
+ clientOut.reverseQueue = &brokerOut.queue;
}
~InProcessConnector() {
@@ -169,7 +201,7 @@ class InProcessConnector :
/** Sliently discard frames sent by either party, lost network traffic. */
void discard() {
brokerOut.queue.setInputHandler(0);
- clientOut.queue.setInputHandler(0);
+ clientOut.queue.setConnectionInputHandler(0);
}
private:
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 4714a998f6..7e757cfad0 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -47,6 +47,7 @@ public:
received = true;
return true;
};
+ void notify() {}
};
class FailOnDeliver : public Deliverable
@@ -88,7 +89,7 @@ class QueueTest : public CppUnit::TestCase
Queue::shared_ptr queue(new Queue("my_test_queue", true));
intrusive_ptr<Message> received;
- TestConsumer::shared_ptr c1(new TestConsumer());
+ TestConsumer c1;
queue->consume(c1);
@@ -98,7 +99,7 @@ class QueueTest : public CppUnit::TestCase
queue->process(msg1);
sleep(2);
- CPPUNIT_ASSERT(!c1->received);
+ CPPUNIT_ASSERT(!c1.received);
msg1->enqueueComplete();
received = queue->dequeue().payload;
@@ -127,8 +128,8 @@ class QueueTest : public CppUnit::TestCase
Queue::shared_ptr queue(new Queue("my_queue", true));
//Test adding consumers:
- TestConsumer::shared_ptr c1(new TestConsumer());
- TestConsumer::shared_ptr c2(new TestConsumer());
+ TestConsumer c1;
+ TestConsumer c2;
queue->consume(c1);
queue->consume(c2);
@@ -140,20 +141,17 @@ class QueueTest : public CppUnit::TestCase
intrusive_ptr<Message> msg3 = message("e", "C");
queue->deliver(msg1);
- if (!c1->received)
- sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg1.get(), c1->last.get());
+ CPPUNIT_ASSERT(queue->dispatch(c1));
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
queue->deliver(msg2);
- if (!c2->received)
- sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg2.get(), c2->last.get());
+ CPPUNIT_ASSERT(queue->dispatch(c2));
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
- c1->received = false;
+ c1.received = false;
queue->deliver(msg3);
- if (!c1->received)
- sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg3.get(), c1->last.get());
+ CPPUNIT_ASSERT(queue->dispatch(c1));
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
//Test cancellation:
queue->cancel(c1);
@@ -203,13 +201,13 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount());
- TestConsumer::shared_ptr consumer(new TestConsumer());
+ TestConsumer consumer;
queue->consume(consumer);
- queue->requestDispatch();
- if (!consumer->received)
+ queue->dispatch(consumer);
+ if (!consumer.received)
sleep(2);
- CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer->last.get());
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->dequeue().payload;