summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-06-16 21:21:27 +0000
committerAlan Conway <aconway@apache.org>2009-06-16 21:21:27 +0000
commit4f851534bb3aab0c6e36a27604d97065164241b2 (patch)
tree5091479fd7f6f5a658a1243be1b2222fc351c96c
parent81446d4fe1f90b6c59f3940c7eae0f0d60314ffe (diff)
downloadqpid-python-4f851534bb3aab0c6e36a27604d97065164241b2.tar.gz
Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection holding only the OutputTasks that are potentially active. Tasks are droped from the list as soon as they return false, and added back when they may have output. Inlined frequently-used SequenceNumber functions. Replace std::list in QueueListeners with std::vector. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@785408 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h3
-rw-r--r--qpid/cpp/src/qpid/broker/QueueListeners.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/QueueListeners.h6
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp58
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h24
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp9
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp16
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp38
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h3
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceNumber.cpp60
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceNumber.h39
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp68
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.h65
-rw-r--r--qpid/cpp/xml/cluster.xml12
22 files changed, 238 insertions, 205 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 8040d74a75..db18c48d82 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -134,6 +134,9 @@ class Connection : public sys::ConnectionInputHandler,
/** Called by cluster to mark shadow connections */
void setShadow() { shadow = true; }
+ // Used by cluster to update connection status
+ sys::AggregateOutput& getOutputTasks() { return outputTasks; }
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index e5bcf9ef57..c96b1af6f8 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1025,3 +1025,5 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
{
return !policy.get() || policy->isEnqueued(msg);
}
+
+QueueListeners& Queue::getListeners() { return listeners; }
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index a8b775cba7..7890e46b03 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -325,6 +325,9 @@ namespace qpid {
* Notify queue that recovery has completed.
*/
void recoveryComplete();
+
+ // For cluster update
+ QueueListeners& getListeners();
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.cpp b/qpid/cpp/src/qpid/broker/QueueListeners.cpp
index 7baca7d0f4..6b3d90906c 100644
--- a/qpid/cpp/src/qpid/broker/QueueListeners.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueListeners.cpp
@@ -46,9 +46,11 @@ void QueueListeners::populate(NotificationSet& set)
{
if (consumers.size()) {
set.consumer = consumers.front();
- consumers.pop_front();
+ consumers.erase(consumers.begin());
} else {
- browsers.swap(set.browsers);
+ // Don't swap the vectors, hang on to the memory allocated.
+ set.browsers = browsers;
+ browsers.clear();
}
}
@@ -70,4 +72,10 @@ void QueueListeners::NotificationSet::notify()
else for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify));
}
+bool QueueListeners::contains(Consumer::shared_ptr c) const {
+ return
+ find(browsers.begin(), browsers.end(), c) != browsers.end() ||
+ find(consumers.begin(), consumers.end(), c) != consumers.end();
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.h b/qpid/cpp/src/qpid/broker/QueueListeners.h
index 53ed6a17e4..32260dd736 100644
--- a/qpid/cpp/src/qpid/broker/QueueListeners.h
+++ b/qpid/cpp/src/qpid/broker/QueueListeners.h
@@ -22,7 +22,7 @@
*
*/
#include "Consumer.h"
-#include <list>
+#include <vector>
namespace qpid {
namespace broker {
@@ -40,7 +40,7 @@ namespace broker {
class QueueListeners
{
public:
- typedef std::list<Consumer::shared_ptr> Listeners;
+ typedef std::vector<Consumer::shared_ptr> Listeners;
class NotificationSet
{
@@ -55,6 +55,8 @@ class QueueListeners
void addListener(Consumer::shared_ptr);
void removeListener(Consumer::shared_ptr);
void populate(NotificationSet&);
+ bool contains(Consumer::shared_ptr c) const;
+
private:
Listeners consumers;
Listeners browsers;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 8f918ff40f..40c9bf296e 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -61,7 +61,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
- outputTasks(ss),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
{
@@ -90,7 +89,6 @@ void SemanticState::consume(const string& tag,
{
ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
- outputTasks.addOutputTask(c.get());
consumers[tag] = c;
}
@@ -98,7 +96,7 @@ void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
cancel(i->second);
- consumers.erase(i);
+ consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
@@ -257,9 +255,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0),
notifyEnabled(true),
- queueHasMessages(1),
syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
- deliveryCount(0) {}
+ deliveryCount(0)
+{}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -290,6 +288,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
+ // FIXME aconway 2009-06-08: if we have byte & message credit but
+ // checkCredit fails because the message is to big, we should
+ // remain on queue's listener list for possible smaller messages
+ // in future.
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
@@ -328,7 +331,8 @@ SemanticState::ConsumerImpl::~ConsumerImpl() {}
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
c->disableNotify();
- outputTasks.removeOutputTask(c.get());
+ if (session.isAttached())
+ session.getConnection().outputTasks.removeOutputTask(c.get());
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
@@ -397,16 +401,18 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
}
void SemanticState::requestDispatch()
-{
- for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(*(i->second));
- }
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++)
+ i->second->requestDispatch();
}
-void SemanticState::requestDispatch(ConsumerImpl& c)
-{
- if(c.isBlocked())
- outputTasks.activateOutput();
+void SemanticState::ConsumerImpl::requestDispatch()
+{
+ if (blocked) {
+ parent->session.getConnection().outputTasks.addOutputTask(this);
+ parent->session.getConnection().outputTasks.activateOutput();
+ blocked = false;
+ }
}
bool SemanticState::complete(DeliveryRecord& delivery)
@@ -475,7 +481,7 @@ void SemanticState::addByteCredit(const std::string& destination, uint32_t value
{
ConsumerImpl& c = find(destination);
c.addByteCredit(value);
- requestDispatch(c);
+ c.requestDispatch();
}
@@ -483,7 +489,7 @@ void SemanticState::addMessageCredit(const std::string& destination, uint32_t va
{
ConsumerImpl& c = find(destination);
c.addMessageCredit(value);
- requestDispatch(c);
+ c.requestDispatch();
}
void SemanticState::flush(const std::string& destination)
@@ -593,11 +599,7 @@ bool SemanticState::ConsumerImpl::hasOutput() {
bool SemanticState::ConsumerImpl::doOutput()
{
- if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0))
- return false;
- if (queue->dispatch(shared_from_this()))
- queueHasMessages.boolCompareAndSwap(0, 1);
- return queueHasMessages.get();
+ return haveCredit() && queue->dispatch(shared_from_this());
}
void SemanticState::ConsumerImpl::enableNotify()
@@ -619,14 +621,11 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
void SemanticState::ConsumerImpl::notify()
{
- queueHasMessages.boolCompareAndSwap(0, 1);
-
- //TODO: alter this, don't want to hold locks across external
- //calls; for now its is required to protect the notify() from
- //having part of the object chain of the invocation being
- //concurrently deleted
Mutex::ScopedLock l(lock);
- if (notifyEnabled) parent->outputTasks.activateOutput();
+ if (notifyEnabled) {
+ parent->session.getConnection().outputTasks.addOutputTask(this);
+ parent->session.getConnection().outputTasks.activateOutput();
+ }
}
@@ -670,13 +669,16 @@ void SemanticState::attached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
+ session.getConnection().outputTasks.addOutputTask(i->second.get());
}
+ session.getConnection().outputTasks.activateOutput();
}
void SemanticState::detached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
+ session.getConnection().outputTasks.removeOutputTask(i->second.get());
}
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index a69962c083..0f2e08cb3c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -55,9 +55,7 @@ class SessionContext;
* SemanticState holds the L3 and L4 state of an open session, whether
* attached to a channel or suspended.
*/
-class SemanticState : public sys::OutputTask,
- private boost::noncopyable
-{
+class SemanticState : private boost::noncopyable {
public:
class ConsumerImpl : public Consumer, public sys::OutputTask,
public boost::enable_shared_from_this<ConsumerImpl>
@@ -77,9 +75,6 @@ class SemanticState : public sys::OutputTask,
uint32_t msgCredit;
uint32_t byteCredit;
bool notifyEnabled;
- // queueHasMessages is boolean but valgrind has trouble with
- // AtomicValue<bool> so use an int with 1 or 0.
- sys:: AtomicValue<int> queueHasMessages;
const int syncFrequency;
int deliveryCount;
@@ -105,6 +100,8 @@ class SemanticState : public sys::OutputTask,
void notify();
bool isNotifyEnabled() const;
+ void requestDispatch();
+
void setWindowMode();
void setCreditMode();
void addByteCredit(uint32_t value);
@@ -130,6 +127,8 @@ class SemanticState : public sys::OutputTask,
std::string getResumeId() const { return resumeId; };
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
+
+ SemanticState& getParent() { return *parent; }
};
private:
@@ -147,7 +146,6 @@ class SemanticState : public sys::OutputTask,
DtxBufferMap suspendedXids;
framing::SequenceSet accumulatedAck;
boost::shared_ptr<Exchange> cacheExchange;
- sys::AggregateOutput outputTasks;
AclModule* acl;
const bool authMsg;
const string userID;
@@ -158,7 +156,6 @@ class SemanticState : public sys::OutputTask,
bool complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
- void requestDispatch(ConsumerImpl&);
void cancel(ConsumerImpl::shared_ptr);
public:
@@ -208,8 +205,6 @@ class SemanticState : public sys::OutputTask,
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
- bool hasOutput() { return outputTasks.hasOutput(); }
- bool doOutput() { return outputTasks.doOutput(); }
//final 0-10 spec (completed and accepted are distinct):
void completed(DeliveryId deliveryTag, DeliveryId endTag);
@@ -218,10 +213,11 @@ class SemanticState : public sys::OutputTask,
void attached();
void detached();
- // Used by cluster to re-create replica sessions
- static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
-
- template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
+ // Used by cluster to re-create sessions
+ template <class F> void eachConsumer(F f) {
+ for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i)
+ f(i->second);
+ }
DeliveryRecords& getUnacked() { return unacked; }
framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 7a277964ab..7bc14daf5d 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -40,9 +40,11 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
public:
virtual ~SessionContext(){}
virtual bool isLocal(const ConnectionToken* t) const = 0;
+ virtual bool isAttached() const = 0;
virtual ConnectionState& getConnection() = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
+ virtual uint16_t getChannel() const = 0;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index f5e9139a76..b465a65bd3 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -99,6 +99,11 @@ AMQP_ClientProxy& SessionState::getProxy() {
return handler->getProxy();
}
+uint16_t SessionState::getChannel() const {
+ assert(isAttached());
+ return handler->getChannel();
+}
+
ConnectionState& SessionState::getConnection() {
assert(isAttached());
return handler->getConnection();
@@ -119,8 +124,7 @@ void SessionState::detach() {
void SessionState::disableOutput()
{
- semanticState.detached();//prevents further activateOutput calls until reattached
- getConnection().outputTasks.removeOutputTask(&semanticState);
+ semanticState.detached(); //prevents further activateOutput calls until reattached
}
void SessionState::attach(SessionHandler& h) {
@@ -362,10 +366,6 @@ void SessionState::readyToSend() {
QPID_LOG(debug, getId() << ": ready to send, activating output.");
assert(handler);
semanticState.attached();
- sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
- tasks.addOutputTask(&semanticState);
- tasks.activateOutput();
-
if (rateFlowcontrol) {
qpid::sys::ScopedLock<Mutex> l(rateLock);
// Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index ef6c56ddbe..f9d35e2aac 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -81,6 +81,9 @@ class SessionState : public qpid::SessionState,
framing::AMQP_ClientProxy& getProxy();
/** @pre isAttached() */
+ uint16_t getChannel() const;
+
+ /** @pre isAttached() */
ConnectionState& getConnection();
bool isLocal(const ConnectionToken* t) const;
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 37562ce46c..fe6958244f 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -755,13 +755,16 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
-void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, Lock&) {
// If we receive an errorCheck here, it's because we have processed past the point
// of the error so respond with ERROR_TYPE_NONE
assert(map.getFrameSeq() >= frameSeq);
- if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE.
+ QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur locally");
mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+ ClusterErrorCheckBody(ProtocolVersion(),
+ framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+ }
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index b857c8a913..c6b5f8499c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -113,7 +113,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Decoder& getDecoder() { return decoder; }
ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
-
+
private:
typedef sys::Monitor::ScopedLock Lock;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index afecbd50e5..e7dac82159 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -245,10 +245,13 @@ broker::SemanticState& Connection::semanticState() {
return sessionState().getSemanticState();
}
-void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) {
+void Connection::consumerState(
+ const string& name, bool blocked, bool notifyEnabled, bool isInListener)
+{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+ if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this());
}
void Connection::sessionState(
@@ -270,6 +273,17 @@ void Connection::sessionState(
unknownCompleted,
receivedIncomplete);
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ // The output tasks will be added later in the update process.
+ connection.getOutputTasks().removeAll();
+}
+
+void Connection::outputTask(uint16_t channel, const std::string& name) {
+ broker::SessionState* session = connection.getChannel(channel).getSession();
+ if (!session)
+ throw Exception(QPID_MSG(cluster << " channel not attached " << *this
+ << "[" << channel << "] "));
+ OutputTask* task = &session->getSemanticState().find(name);
+ connection.getOutputTasks().addOutputTask(task);
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 8e3b0ad337..51aab92bfc 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -103,7 +103,7 @@ class Connection :
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
- void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
+ void consumerState(const std::string& name, bool blocked, bool notifyEnabled, bool isInListener);
// ==== Used in catch-up mode to build initial state.
//
@@ -115,6 +115,8 @@ class Connection :
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
+ void outputTask(uint16_t channel, const std::string& name);
+
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index ef99058471..3c3c330787 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -48,8 +48,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
parent.getCluster().checkQuorum();
{
- // FIXME aconway 2009-04-28: locking around next-> may be redundant
- // with the fixes to read-credit in the IO layer. Review.
sys::Mutex::ScopedLock l(lock);
next->send(f);
}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 332e74c512..7c305a2e92 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -54,6 +54,7 @@
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
+#include <boost/cast.hpp>
#include <algorithm>
namespace qpid {
@@ -64,6 +65,8 @@ using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
+using broker::SemanticState;
+
using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
@@ -125,7 +128,8 @@ void UpdateClient::update() {
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
- // Update queue is used to transfer acquired messages that are no longer on their original queue.
+ // Update queue is used to transfer acquired messages that are no
+ // longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
@@ -256,6 +260,16 @@ void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& que
s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
+void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
+ const SemanticState::ConsumerImpl* cci =
+ boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
+ SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
+ uint16_t channel = ci->getParent().getSession().getChannel();
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
+ QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ << " channel=" << channel);
+}
+
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
shadowConnection = catchUpConnection();
@@ -266,6 +280,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
// Safe to use decoder here because we are stalled for update.
std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
+ bc.getOutputTasks().eachOutput(
+ boost::bind(&UpdateClient::updateOutputTask, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
@@ -294,9 +310,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
QPID_LOG(debug, updaterId << " updating consumers.");
- ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
+ ss->getSemanticState().eachConsumer(
+ boost::bind(&UpdateClient::updateConsumer, this, _1));
QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
@@ -304,7 +320,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
updateTxState(ss->getSemanticState()); // Tx transaction state.
- // Adjust for command counter for message in progress, will be sent after state update.
+ // Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
@@ -328,8 +344,11 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
-void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId());
+void UpdateClient::updateConsumer(
+ const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
+{
+ QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -344,13 +363,12 @@ void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci)
shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
- ClusterConnectionConsumerStateBody state(
- ProtocolVersion(),
+ ClusterConnectionProxy(shadowSession).consumerState(
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabled()
+ ci->isNotifyEnabled(),
+ ci->getQueue()->getListeners().contains(ci)
);
- client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 030566b52d..ba5bdd1d75 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -91,7 +91,8 @@ class UpdateClient : public sys::Runnable {
void updateConnection(const boost::intrusive_ptr<Connection>& connection);
void updateSession(broker::SessionHandler& s);
void updateTxState(broker::SemanticState& s);
- void updateConsumer(const broker::SemanticState::ConsumerImpl*);
+ void updateOutputTask(const sys::OutputTask* task);
+ void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
MemberId updaterId;
MemberId updateeId;
diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
index cac4e6681e..e61e3f2edf 100644
--- a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
+++ b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
@@ -26,60 +26,6 @@
using qpid::framing::SequenceNumber;
using qpid::framing::Buffer;
-SequenceNumber::SequenceNumber() : value(0) {}
-
-SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}
-
-bool SequenceNumber::operator==(const SequenceNumber& other) const
-{
- return value == other.value;
-}
-
-bool SequenceNumber::operator!=(const SequenceNumber& other) const
-{
- return !(value == other.value);
-}
-
-
-SequenceNumber& SequenceNumber::operator++()
-{
- value = value + 1;
- return *this;
-}
-
-const SequenceNumber SequenceNumber::operator++(int)
-{
- SequenceNumber old(value);
- value = value + 1;
- return old;
-}
-
-SequenceNumber& SequenceNumber::operator--()
-{
- value = value - 1;
- return *this;
-}
-
-bool SequenceNumber::operator<(const SequenceNumber& other) const
-{
- return (value - other.value) < 0;
-}
-
-bool SequenceNumber::operator>(const SequenceNumber& other) const
-{
- return other < *this;
-}
-
-bool SequenceNumber::operator<=(const SequenceNumber& other) const
-{
- return *this == other || *this < other;
-}
-
-bool SequenceNumber::operator>=(const SequenceNumber& other) const
-{
- return *this == other || *this > other;
-}
-
void SequenceNumber::encode(Buffer& buffer) const
{
buffer.putLong(value);
@@ -97,12 +43,6 @@ uint32_t SequenceNumber::encodedSize() const {
namespace qpid {
namespace framing {
-int32_t operator-(const SequenceNumber& a, const SequenceNumber& b)
-{
- int32_t result = a.value - b.value;
- return result;
-}
-
std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) {
return o << n.getValue();
}
diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.h b/qpid/cpp/src/qpid/framing/SequenceNumber.h
index 3b18ce1360..c208739cdd 100644
--- a/qpid/cpp/src/qpid/framing/SequenceNumber.h
+++ b/qpid/cpp/src/qpid/framing/SequenceNumber.h
@@ -22,6 +22,7 @@
#define _framing_SequenceNumber_h
#include "amqp_types.h"
+#include <boost/operators.hpp>
#include <iosfwd>
#include "qpid/CommonImportExport.h"
@@ -33,35 +34,37 @@ class Buffer;
/**
* 4-byte sequence number that 'wraps around'.
*/
-class SequenceNumber
+class SequenceNumber : public
+boost::equality_comparable<
+ SequenceNumber, boost::less_than_comparable<
+ SequenceNumber, boost::incrementable<
+ SequenceNumber, boost::decrementable<SequenceNumber> > > >
{
int32_t value;
- public:
- QPID_COMMON_EXTERN SequenceNumber();
- QPID_COMMON_EXTERN SequenceNumber(uint32_t v);
-
- QPID_COMMON_EXTERN SequenceNumber& operator++();//prefix ++
- QPID_COMMON_EXTERN const SequenceNumber operator++(int);//postfix ++
- QPID_COMMON_EXTERN SequenceNumber& operator--();//prefix ++
- QPID_COMMON_EXTERN bool operator==(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator!=(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator<(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator>(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator<=(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator>=(const SequenceNumber& other) const;
- uint32_t getValue() const { return (uint32_t) value; }
- operator uint32_t() const { return (uint32_t) value; }
-
- QPID_COMMON_EXTERN friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
+ public:
+ SequenceNumber(uint32_t v=0) : value(v) {}
+
+ SequenceNumber& operator++() { ++value; return *this; }
+ SequenceNumber& operator--() { --value; return *this; }
+ bool operator==(const SequenceNumber& other) const { return value == other.value; }
+ bool operator<(const SequenceNumber& other) const { return (value - other.value) < 0; }
+ uint32_t getValue() const { return uint32_t(value); }
+ operator uint32_t() const { return uint32_t(value); }
void encode(Buffer& buffer) const;
void decode(Buffer& buffer);
uint32_t encodedSize() const;
template <class S> void serialize(S& s) { s(value); }
+
+ friend inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
};
+inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) {
+ return int32_t(a.value - b.value);
+}
+
struct Window
{
SequenceNumber hwm;
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
index 74bf6d0f85..d46fccc208 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -26,50 +26,66 @@
namespace qpid {
namespace sys {
+AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {}
+
void AggregateOutput::abort() { control.abort(); }
void AggregateOutput::activateOutput() { control.activateOutput(); }
void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
-bool AggregateOutput::hasOutput() {
- for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i)
- if ((*i)->hasOutput()) return true;
- return false;
+bool AggregateOutput::AggregateOutput::hasOutput() {
+ Mutex::ScopedLock l(lock);
+ return !tasks.empty();
}
-bool AggregateOutput::doOutput()
-{
- bool result = false;
- if (!tasks.empty()) {
- if (next >= tasks.size()) next = next % tasks.size();
+// Clear the busy flag and notify waiting threads in destructor.
+struct ScopedBusy {
+ bool& flag;
+ Monitor& monitor;
+ ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; }
+ ~ScopedBusy() { flag = false; monitor.notifyAll(); }
+};
+
+bool AggregateOutput::doOutput() {
+ Mutex::ScopedLock l(lock);
+ ScopedBusy sb(busy, lock);
- size_t start = next;
- //loop until a task generated some output
- while (!result) {
- result = tasks[next++]->doOutput();
- if (tasks.empty()) break;
- if (next >= tasks.size()) next = next % tasks.size();
- if (start == next) break;
+ while (!tasks.empty()) {
+ OutputTask* t=tasks.front();
+ tasks.pop_front();
+ bool didOutput;
+ {
+ // Allow concurrent call to addOutputTask.
+ // removeOutputTask will wait till !busy before removing a task.
+ Mutex::ScopedUnlock u(lock);
+ didOutput = t->doOutput();
+ }
+ if (didOutput) {
+ tasks.push_back(t);
+ return true;
}
}
- return result;
+ return false;
}
-
-void AggregateOutput::addOutputTask(OutputTask* t)
-{
- tasks.push_back(t);
+
+void AggregateOutput::addOutputTask(OutputTask* task) {
+ Mutex::ScopedLock l(lock);
+ tasks.push_back(task);
}
-void AggregateOutput::removeOutputTask(OutputTask* t)
-{
- TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t);
- if (i != tasks.end()) tasks.erase(i);
+void AggregateOutput::removeOutputTask(OutputTask* task) {
+ Mutex::ScopedLock l(lock);
+ while (busy) lock.wait();
+ tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
}
-
+
void AggregateOutput::removeAll()
{
+ Mutex::ScopedLock l(lock);
+ while (busy) lock.wait();
tasks.clear();
}
+
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h
index b33113796c..4e3190a093 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.h
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h
@@ -21,47 +21,58 @@
#ifndef _AggregateOutput_
#define _AggregateOutput_
-#include "Mutex.h"
+#include "Monitor.h"
#include "OutputControl.h"
#include "OutputTask.h"
#include "qpid/CommonImportExport.h"
#include <algorithm>
-#include <vector>
+#include <deque>
namespace qpid {
namespace sys {
- class AggregateOutput : public OutputTask, public OutputControl
- {
- typedef std::vector<OutputTask*> TaskList;
+/**
+ * Holds a collection of output tasks, doOutput picks the next one to execute.
+ *
+ * Tasks are automatically removed if their doOutput() or hasOutput() returns false.
+ *
+ * Thread safe. addOutputTask may be called in one connection thread while
+ * doOutput is called in another.
+ */
+
+class AggregateOutput : public OutputTask, public OutputControl
+{
+ typedef std::deque<OutputTask*> TaskList;
+
+ Monitor lock;
+ TaskList tasks;
+ bool busy;
+ OutputControl& control;
- TaskList tasks;
- size_t next;
- OutputControl& control;
+ public:
+ QPID_COMMON_EXTERN AggregateOutput(OutputControl& c);
- public:
- AggregateOutput(OutputControl& c) : next(0), control(c) {};
- //this may be called on any thread
- QPID_COMMON_EXTERN void abort();
- QPID_COMMON_EXTERN void activateOutput();
- QPID_COMMON_EXTERN void giveReadCredit(int32_t);
+ // These may be called concurrently with any function.
+ QPID_COMMON_EXTERN void abort();
+ QPID_COMMON_EXTERN void activateOutput();
+ QPID_COMMON_EXTERN void giveReadCredit(int32_t);
+ QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
- //all the following will be called on the same thread
- QPID_COMMON_EXTERN bool doOutput();
- QPID_COMMON_EXTERN bool hasOutput();
- QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
- QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t);
- QPID_COMMON_EXTERN void removeAll();
+ // These functions must not be called concurrently with each other.
+ QPID_COMMON_EXTERN bool doOutput();
+ QPID_COMMON_EXTERN bool hasOutput();
+ QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t);
+ QPID_COMMON_EXTERN void removeAll();
- /** Apply f to each OutputTask* in the tasks list */
- template <class F> void eachOutput(F f) {
- std::for_each(tasks.begin(), tasks.end(), f);
- }
- };
+ /** Apply f to each OutputTask* in the tasks list */
+ template <class F> void eachOutput(F f) {
+ Mutex::ScopedLock l(lock);
+ std::for_each(tasks.begin(), tasks.end(), f);
+ }
+};
-}
-}
+}} // namespace qpid::sys
#endif
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 58b067a3db..8b9cbfed1e 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -65,8 +65,6 @@
</class>
- <!-- TODO aconway 2008-09-10: support for un-attached connections. -->
-
<!-- Controls associated with a specific connection. -->
<class name="cluster-connection" code="0x81" label="Qpid clustering extensions.">
@@ -91,6 +89,8 @@
<field name="name" type="str8"/>
<field name="blocked" type="bit"/>
<field name="notifyEnabled" type="bit"/>
+ <!-- Flag set if the consumer is in its queue's listener set. -->
+ <field name="is-in-listener" type="bit"/>
</control>
<!-- Delivery-record for outgoing messages sent but not yet accepted. -->
@@ -121,8 +121,14 @@
<control name="tx-end" code="0x17"/>
<control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
+ <!-- Consumers in the connection's output task -->
+ <control name="output-task" code="0x19">
+ <field name="channel" type="uint16"/>
+ <field name="name" type="str8"/>
+ </control>
+
<!-- Complete a session state update. -->
- <control name="session-state" code="0x1F" label="Set session state during a brain update.">
+ <control name="session-state" code="0x1F">
<!-- Target session deduced from channel number. -->
<field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.-->
<field name="command-point" type="sequence-no"/> <!-- Id of next command sent -->