summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h3
-rw-r--r--qpid/cpp/src/qpid/broker/QueueListeners.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/QueueListeners.h6
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp58
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h24
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp9
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp16
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp38
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h3
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceNumber.cpp60
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceNumber.h39
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp68
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.h65
-rw-r--r--qpid/cpp/xml/cluster.xml12
22 files changed, 238 insertions, 205 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 8040d74a75..db18c48d82 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -134,6 +134,9 @@ class Connection : public sys::ConnectionInputHandler,
/** Called by cluster to mark shadow connections */
void setShadow() { shadow = true; }
+ // Used by cluster to update connection status
+ sys::AggregateOutput& getOutputTasks() { return outputTasks; }
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index e5bcf9ef57..c96b1af6f8 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1025,3 +1025,5 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
{
return !policy.get() || policy->isEnqueued(msg);
}
+
+QueueListeners& Queue::getListeners() { return listeners; }
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index a8b775cba7..7890e46b03 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -325,6 +325,9 @@ namespace qpid {
* Notify queue that recovery has completed.
*/
void recoveryComplete();
+
+ // For cluster update
+ QueueListeners& getListeners();
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.cpp b/qpid/cpp/src/qpid/broker/QueueListeners.cpp
index 7baca7d0f4..6b3d90906c 100644
--- a/qpid/cpp/src/qpid/broker/QueueListeners.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueListeners.cpp
@@ -46,9 +46,11 @@ void QueueListeners::populate(NotificationSet& set)
{
if (consumers.size()) {
set.consumer = consumers.front();
- consumers.pop_front();
+ consumers.erase(consumers.begin());
} else {
- browsers.swap(set.browsers);
+ // Don't swap the vectors, hang on to the memory allocated.
+ set.browsers = browsers;
+ browsers.clear();
}
}
@@ -70,4 +72,10 @@ void QueueListeners::NotificationSet::notify()
else for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify));
}
+bool QueueListeners::contains(Consumer::shared_ptr c) const {
+ return
+ find(browsers.begin(), browsers.end(), c) != browsers.end() ||
+ find(consumers.begin(), consumers.end(), c) != consumers.end();
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.h b/qpid/cpp/src/qpid/broker/QueueListeners.h
index 53ed6a17e4..32260dd736 100644
--- a/qpid/cpp/src/qpid/broker/QueueListeners.h
+++ b/qpid/cpp/src/qpid/broker/QueueListeners.h
@@ -22,7 +22,7 @@
*
*/
#include "Consumer.h"
-#include <list>
+#include <vector>
namespace qpid {
namespace broker {
@@ -40,7 +40,7 @@ namespace broker {
class QueueListeners
{
public:
- typedef std::list<Consumer::shared_ptr> Listeners;
+ typedef std::vector<Consumer::shared_ptr> Listeners;
class NotificationSet
{
@@ -55,6 +55,8 @@ class QueueListeners
void addListener(Consumer::shared_ptr);
void removeListener(Consumer::shared_ptr);
void populate(NotificationSet&);
+ bool contains(Consumer::shared_ptr c) const;
+
private:
Listeners consumers;
Listeners browsers;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 8f918ff40f..40c9bf296e 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -61,7 +61,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
- outputTasks(ss),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
{
@@ -90,7 +89,6 @@ void SemanticState::consume(const string& tag,
{
ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
- outputTasks.addOutputTask(c.get());
consumers[tag] = c;
}
@@ -98,7 +96,7 @@ void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
cancel(i->second);
- consumers.erase(i);
+ consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
@@ -257,9 +255,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0),
notifyEnabled(true),
- queueHasMessages(1),
syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
- deliveryCount(0) {}
+ deliveryCount(0)
+{}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -290,6 +288,11 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
+ // FIXME aconway 2009-06-08: if we have byte & message credit but
+ // checkCredit fails because the message is to big, we should
+ // remain on queue's listener list for possible smaller messages
+ // in future.
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
@@ -328,7 +331,8 @@ SemanticState::ConsumerImpl::~ConsumerImpl() {}
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
c->disableNotify();
- outputTasks.removeOutputTask(c.get());
+ if (session.isAttached())
+ session.getConnection().outputTasks.removeOutputTask(c.get());
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
@@ -397,16 +401,18 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
}
void SemanticState::requestDispatch()
-{
- for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(*(i->second));
- }
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++)
+ i->second->requestDispatch();
}
-void SemanticState::requestDispatch(ConsumerImpl& c)
-{
- if(c.isBlocked())
- outputTasks.activateOutput();
+void SemanticState::ConsumerImpl::requestDispatch()
+{
+ if (blocked) {
+ parent->session.getConnection().outputTasks.addOutputTask(this);
+ parent->session.getConnection().outputTasks.activateOutput();
+ blocked = false;
+ }
}
bool SemanticState::complete(DeliveryRecord& delivery)
@@ -475,7 +481,7 @@ void SemanticState::addByteCredit(const std::string& destination, uint32_t value
{
ConsumerImpl& c = find(destination);
c.addByteCredit(value);
- requestDispatch(c);
+ c.requestDispatch();
}
@@ -483,7 +489,7 @@ void SemanticState::addMessageCredit(const std::string& destination, uint32_t va
{
ConsumerImpl& c = find(destination);
c.addMessageCredit(value);
- requestDispatch(c);
+ c.requestDispatch();
}
void SemanticState::flush(const std::string& destination)
@@ -593,11 +599,7 @@ bool SemanticState::ConsumerImpl::hasOutput() {
bool SemanticState::ConsumerImpl::doOutput()
{
- if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0))
- return false;
- if (queue->dispatch(shared_from_this()))
- queueHasMessages.boolCompareAndSwap(0, 1);
- return queueHasMessages.get();
+ return haveCredit() && queue->dispatch(shared_from_this());
}
void SemanticState::ConsumerImpl::enableNotify()
@@ -619,14 +621,11 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
void SemanticState::ConsumerImpl::notify()
{
- queueHasMessages.boolCompareAndSwap(0, 1);
-
- //TODO: alter this, don't want to hold locks across external
- //calls; for now its is required to protect the notify() from
- //having part of the object chain of the invocation being
- //concurrently deleted
Mutex::ScopedLock l(lock);
- if (notifyEnabled) parent->outputTasks.activateOutput();
+ if (notifyEnabled) {
+ parent->session.getConnection().outputTasks.addOutputTask(this);
+ parent->session.getConnection().outputTasks.activateOutput();
+ }
}
@@ -670,13 +669,16 @@ void SemanticState::attached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
+ session.getConnection().outputTasks.addOutputTask(i->second.get());
}
+ session.getConnection().outputTasks.activateOutput();
}
void SemanticState::detached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
+ session.getConnection().outputTasks.removeOutputTask(i->second.get());
}
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index a69962c083..0f2e08cb3c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -55,9 +55,7 @@ class SessionContext;
* SemanticState holds the L3 and L4 state of an open session, whether
* attached to a channel or suspended.
*/
-class SemanticState : public sys::OutputTask,
- private boost::noncopyable
-{
+class SemanticState : private boost::noncopyable {
public:
class ConsumerImpl : public Consumer, public sys::OutputTask,
public boost::enable_shared_from_this<ConsumerImpl>
@@ -77,9 +75,6 @@ class SemanticState : public sys::OutputTask,
uint32_t msgCredit;
uint32_t byteCredit;
bool notifyEnabled;
- // queueHasMessages is boolean but valgrind has trouble with
- // AtomicValue<bool> so use an int with 1 or 0.
- sys:: AtomicValue<int> queueHasMessages;
const int syncFrequency;
int deliveryCount;
@@ -105,6 +100,8 @@ class SemanticState : public sys::OutputTask,
void notify();
bool isNotifyEnabled() const;
+ void requestDispatch();
+
void setWindowMode();
void setCreditMode();
void addByteCredit(uint32_t value);
@@ -130,6 +127,8 @@ class SemanticState : public sys::OutputTask,
std::string getResumeId() const { return resumeId; };
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
+
+ SemanticState& getParent() { return *parent; }
};
private:
@@ -147,7 +146,6 @@ class SemanticState : public sys::OutputTask,
DtxBufferMap suspendedXids;
framing::SequenceSet accumulatedAck;
boost::shared_ptr<Exchange> cacheExchange;
- sys::AggregateOutput outputTasks;
AclModule* acl;
const bool authMsg;
const string userID;
@@ -158,7 +156,6 @@ class SemanticState : public sys::OutputTask,
bool complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
- void requestDispatch(ConsumerImpl&);
void cancel(ConsumerImpl::shared_ptr);
public:
@@ -208,8 +205,6 @@ class SemanticState : public sys::OutputTask,
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
- bool hasOutput() { return outputTasks.hasOutput(); }
- bool doOutput() { return outputTasks.doOutput(); }
//final 0-10 spec (completed and accepted are distinct):
void completed(DeliveryId deliveryTag, DeliveryId endTag);
@@ -218,10 +213,11 @@ class SemanticState : public sys::OutputTask,
void attached();
void detached();
- // Used by cluster to re-create replica sessions
- static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
-
- template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
+ // Used by cluster to re-create sessions
+ template <class F> void eachConsumer(F f) {
+ for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i)
+ f(i->second);
+ }
DeliveryRecords& getUnacked() { return unacked; }
framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 7a277964ab..7bc14daf5d 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -40,9 +40,11 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
public:
virtual ~SessionContext(){}
virtual bool isLocal(const ConnectionToken* t) const = 0;
+ virtual bool isAttached() const = 0;
virtual ConnectionState& getConnection() = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
+ virtual uint16_t getChannel() const = 0;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index f5e9139a76..b465a65bd3 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -99,6 +99,11 @@ AMQP_ClientProxy& SessionState::getProxy() {
return handler->getProxy();
}
+uint16_t SessionState::getChannel() const {
+ assert(isAttached());
+ return handler->getChannel();
+}
+
ConnectionState& SessionState::getConnection() {
assert(isAttached());
return handler->getConnection();
@@ -119,8 +124,7 @@ void SessionState::detach() {
void SessionState::disableOutput()
{
- semanticState.detached();//prevents further activateOutput calls until reattached
- getConnection().outputTasks.removeOutputTask(&semanticState);
+ semanticState.detached(); //prevents further activateOutput calls until reattached
}
void SessionState::attach(SessionHandler& h) {
@@ -362,10 +366,6 @@ void SessionState::readyToSend() {
QPID_LOG(debug, getId() << ": ready to send, activating output.");
assert(handler);
semanticState.attached();
- sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
- tasks.addOutputTask(&semanticState);
- tasks.activateOutput();
-
if (rateFlowcontrol) {
qpid::sys::ScopedLock<Mutex> l(rateLock);
// Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index ef6c56ddbe..f9d35e2aac 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -81,6 +81,9 @@ class SessionState : public qpid::SessionState,
framing::AMQP_ClientProxy& getProxy();
/** @pre isAttached() */
+ uint16_t getChannel() const;
+
+ /** @pre isAttached() */
ConnectionState& getConnection();
bool isLocal(const ConnectionToken* t) const;
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 37562ce46c..fe6958244f 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -755,13 +755,16 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
-void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, Lock&) {
// If we receive an errorCheck here, it's because we have processed past the point
// of the error so respond with ERROR_TYPE_NONE
assert(map.getFrameSeq() >= frameSeq);
- if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE.
+ QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur locally");
mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+ ClusterErrorCheckBody(ProtocolVersion(),
+ framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+ }
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index b857c8a913..c6b5f8499c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -113,7 +113,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Decoder& getDecoder() { return decoder; }
ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
-
+
private:
typedef sys::Monitor::ScopedLock Lock;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index afecbd50e5..e7dac82159 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -245,10 +245,13 @@ broker::SemanticState& Connection::semanticState() {
return sessionState().getSemanticState();
}
-void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) {
+void Connection::consumerState(
+ const string& name, bool blocked, bool notifyEnabled, bool isInListener)
+{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+ if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this());
}
void Connection::sessionState(
@@ -270,6 +273,17 @@ void Connection::sessionState(
unknownCompleted,
receivedIncomplete);
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ // The output tasks will be added later in the update process.
+ connection.getOutputTasks().removeAll();
+}
+
+void Connection::outputTask(uint16_t channel, const std::string& name) {
+ broker::SessionState* session = connection.getChannel(channel).getSession();
+ if (!session)
+ throw Exception(QPID_MSG(cluster << " channel not attached " << *this
+ << "[" << channel << "] "));
+ OutputTask* task = &session->getSemanticState().find(name);
+ connection.getOutputTasks().addOutputTask(task);
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 8e3b0ad337..51aab92bfc 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -103,7 +103,7 @@ class Connection :
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
- void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
+ void consumerState(const std::string& name, bool blocked, bool notifyEnabled, bool isInListener);
// ==== Used in catch-up mode to build initial state.
//
@@ -115,6 +115,8 @@ class Connection :
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
+ void outputTask(uint16_t channel, const std::string& name);
+
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index ef99058471..3c3c330787 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -48,8 +48,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
parent.getCluster().checkQuorum();
{
- // FIXME aconway 2009-04-28: locking around next-> may be redundant
- // with the fixes to read-credit in the IO layer. Review.
sys::Mutex::ScopedLock l(lock);
next->send(f);
}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 332e74c512..7c305a2e92 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -54,6 +54,7 @@
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
+#include <boost/cast.hpp>
#include <algorithm>
namespace qpid {
@@ -64,6 +65,8 @@ using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
+using broker::SemanticState;
+
using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
@@ -125,7 +128,8 @@ void UpdateClient::update() {
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
- // Update queue is used to transfer acquired messages that are no longer on their original queue.
+ // Update queue is used to transfer acquired messages that are no
+ // longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
@@ -256,6 +260,16 @@ void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& que
s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
+void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
+ const SemanticState::ConsumerImpl* cci =
+ boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
+ SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
+ uint16_t channel = ci->getParent().getSession().getChannel();
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
+ QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ << " channel=" << channel);
+}
+
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
shadowConnection = catchUpConnection();
@@ -266,6 +280,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
// Safe to use decoder here because we are stalled for update.
std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
+ bc.getOutputTasks().eachOutput(
+ boost::bind(&UpdateClient::updateOutputTask, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
@@ -294,9 +310,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
QPID_LOG(debug, updaterId << " updating consumers.");
- ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
+ ss->getSemanticState().eachConsumer(
+ boost::bind(&UpdateClient::updateConsumer, this, _1));
QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
@@ -304,7 +320,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
updateTxState(ss->getSemanticState()); // Tx transaction state.
- // Adjust for command counter for message in progress, will be sent after state update.
+ // Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
@@ -328,8 +344,11 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
-void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId());
+void UpdateClient::updateConsumer(
+ const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
+{
+ QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -344,13 +363,12 @@ void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci)
shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
- ClusterConnectionConsumerStateBody state(
- ProtocolVersion(),
+ ClusterConnectionProxy(shadowSession).consumerState(
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabled()
+ ci->isNotifyEnabled(),
+ ci->getQueue()->getListeners().contains(ci)
);
- client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 030566b52d..ba5bdd1d75 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -91,7 +91,8 @@ class UpdateClient : public sys::Runnable {
void updateConnection(const boost::intrusive_ptr<Connection>& connection);
void updateSession(broker::SessionHandler& s);
void updateTxState(broker::SemanticState& s);
- void updateConsumer(const broker::SemanticState::ConsumerImpl*);
+ void updateOutputTask(const sys::OutputTask* task);
+ void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
MemberId updaterId;
MemberId updateeId;
diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
index cac4e6681e..e61e3f2edf 100644
--- a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
+++ b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
@@ -26,60 +26,6 @@
using qpid::framing::SequenceNumber;
using qpid::framing::Buffer;
-SequenceNumber::SequenceNumber() : value(0) {}
-
-SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}
-
-bool SequenceNumber::operator==(const SequenceNumber& other) const
-{
- return value == other.value;
-}
-
-bool SequenceNumber::operator!=(const SequenceNumber& other) const
-{
- return !(value == other.value);
-}
-
-
-SequenceNumber& SequenceNumber::operator++()
-{
- value = value + 1;
- return *this;
-}
-
-const SequenceNumber SequenceNumber::operator++(int)
-{
- SequenceNumber old(value);
- value = value + 1;
- return old;
-}
-
-SequenceNumber& SequenceNumber::operator--()
-{
- value = value - 1;
- return *this;
-}
-
-bool SequenceNumber::operator<(const SequenceNumber& other) const
-{
- return (value - other.value) < 0;
-}
-
-bool SequenceNumber::operator>(const SequenceNumber& other) const
-{
- return other < *this;
-}
-
-bool SequenceNumber::operator<=(const SequenceNumber& other) const
-{
- return *this == other || *this < other;
-}
-
-bool SequenceNumber::operator>=(const SequenceNumber& other) const
-{
- return *this == other || *this > other;
-}
-
void SequenceNumber::encode(Buffer& buffer) const
{
buffer.putLong(value);
@@ -97,12 +43,6 @@ uint32_t SequenceNumber::encodedSize() const {
namespace qpid {
namespace framing {
-int32_t operator-(const SequenceNumber& a, const SequenceNumber& b)
-{
- int32_t result = a.value - b.value;
- return result;
-}
-
std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) {
return o << n.getValue();
}
diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.h b/qpid/cpp/src/qpid/framing/SequenceNumber.h
index 3b18ce1360..c208739cdd 100644
--- a/qpid/cpp/src/qpid/framing/SequenceNumber.h
+++ b/qpid/cpp/src/qpid/framing/SequenceNumber.h
@@ -22,6 +22,7 @@
#define _framing_SequenceNumber_h
#include "amqp_types.h"
+#include <boost/operators.hpp>
#include <iosfwd>
#include "qpid/CommonImportExport.h"
@@ -33,35 +34,37 @@ class Buffer;
/**
* 4-byte sequence number that 'wraps around'.
*/
-class SequenceNumber
+class SequenceNumber : public
+boost::equality_comparable<
+ SequenceNumber, boost::less_than_comparable<
+ SequenceNumber, boost::incrementable<
+ SequenceNumber, boost::decrementable<SequenceNumber> > > >
{
int32_t value;
- public:
- QPID_COMMON_EXTERN SequenceNumber();
- QPID_COMMON_EXTERN SequenceNumber(uint32_t v);
-
- QPID_COMMON_EXTERN SequenceNumber& operator++();//prefix ++
- QPID_COMMON_EXTERN const SequenceNumber operator++(int);//postfix ++
- QPID_COMMON_EXTERN SequenceNumber& operator--();//prefix ++
- QPID_COMMON_EXTERN bool operator==(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator!=(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator<(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator>(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator<=(const SequenceNumber& other) const;
- QPID_COMMON_EXTERN bool operator>=(const SequenceNumber& other) const;
- uint32_t getValue() const { return (uint32_t) value; }
- operator uint32_t() const { return (uint32_t) value; }
-
- QPID_COMMON_EXTERN friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
+ public:
+ SequenceNumber(uint32_t v=0) : value(v) {}
+
+ SequenceNumber& operator++() { ++value; return *this; }
+ SequenceNumber& operator--() { --value; return *this; }
+ bool operator==(const SequenceNumber& other) const { return value == other.value; }
+ bool operator<(const SequenceNumber& other) const { return (value - other.value) < 0; }
+ uint32_t getValue() const { return uint32_t(value); }
+ operator uint32_t() const { return uint32_t(value); }
void encode(Buffer& buffer) const;
void decode(Buffer& buffer);
uint32_t encodedSize() const;
template <class S> void serialize(S& s) { s(value); }
+
+ friend inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
};
+inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) {
+ return int32_t(a.value - b.value);
+}
+
struct Window
{
SequenceNumber hwm;
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
index 74bf6d0f85..d46fccc208 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -26,50 +26,66 @@
namespace qpid {
namespace sys {
+AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {}
+
void AggregateOutput::abort() { control.abort(); }
void AggregateOutput::activateOutput() { control.activateOutput(); }
void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
-bool AggregateOutput::hasOutput() {
- for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i)
- if ((*i)->hasOutput()) return true;
- return false;
+bool AggregateOutput::AggregateOutput::hasOutput() {
+ Mutex::ScopedLock l(lock);
+ return !tasks.empty();
}
-bool AggregateOutput::doOutput()
-{
- bool result = false;
- if (!tasks.empty()) {
- if (next >= tasks.size()) next = next % tasks.size();
+// Clear the busy flag and notify waiting threads in destructor.
+struct ScopedBusy {
+ bool& flag;
+ Monitor& monitor;
+ ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; }
+ ~ScopedBusy() { flag = false; monitor.notifyAll(); }
+};
+
+bool AggregateOutput::doOutput() {
+ Mutex::ScopedLock l(lock);
+ ScopedBusy sb(busy, lock);
- size_t start = next;
- //loop until a task generated some output
- while (!result) {
- result = tasks[next++]->doOutput();
- if (tasks.empty()) break;
- if (next >= tasks.size()) next = next % tasks.size();
- if (start == next) break;
+ while (!tasks.empty()) {
+ OutputTask* t=tasks.front();
+ tasks.pop_front();
+ bool didOutput;
+ {
+ // Allow concurrent call to addOutputTask.
+ // removeOutputTask will wait till !busy before removing a task.
+ Mutex::ScopedUnlock u(lock);
+ didOutput = t->doOutput();
+ }
+ if (didOutput) {
+ tasks.push_back(t);
+ return true;
}
}
- return result;
+ return false;
}
-
-void AggregateOutput::addOutputTask(OutputTask* t)
-{
- tasks.push_back(t);
+
+void AggregateOutput::addOutputTask(OutputTask* task) {
+ Mutex::ScopedLock l(lock);
+ tasks.push_back(task);
}
-void AggregateOutput::removeOutputTask(OutputTask* t)
-{
- TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t);
- if (i != tasks.end()) tasks.erase(i);
+void AggregateOutput::removeOutputTask(OutputTask* task) {
+ Mutex::ScopedLock l(lock);
+ while (busy) lock.wait();
+ tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
}
-
+
void AggregateOutput::removeAll()
{
+ Mutex::ScopedLock l(lock);
+ while (busy) lock.wait();
tasks.clear();
}
+
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h
index b33113796c..4e3190a093 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.h
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h
@@ -21,47 +21,58 @@
#ifndef _AggregateOutput_
#define _AggregateOutput_
-#include "Mutex.h"
+#include "Monitor.h"
#include "OutputControl.h"
#include "OutputTask.h"
#include "qpid/CommonImportExport.h"
#include <algorithm>
-#include <vector>
+#include <deque>
namespace qpid {
namespace sys {
- class AggregateOutput : public OutputTask, public OutputControl
- {
- typedef std::vector<OutputTask*> TaskList;
+/**
+ * Holds a collection of output tasks, doOutput picks the next one to execute.
+ *
+ * Tasks are automatically removed if their doOutput() or hasOutput() returns false.
+ *
+ * Thread safe. addOutputTask may be called in one connection thread while
+ * doOutput is called in another.
+ */
+
+class AggregateOutput : public OutputTask, public OutputControl
+{
+ typedef std::deque<OutputTask*> TaskList;
+
+ Monitor lock;
+ TaskList tasks;
+ bool busy;
+ OutputControl& control;
- TaskList tasks;
- size_t next;
- OutputControl& control;
+ public:
+ QPID_COMMON_EXTERN AggregateOutput(OutputControl& c);
- public:
- AggregateOutput(OutputControl& c) : next(0), control(c) {};
- //this may be called on any thread
- QPID_COMMON_EXTERN void abort();
- QPID_COMMON_EXTERN void activateOutput();
- QPID_COMMON_EXTERN void giveReadCredit(int32_t);
+ // These may be called concurrently with any function.
+ QPID_COMMON_EXTERN void abort();
+ QPID_COMMON_EXTERN void activateOutput();
+ QPID_COMMON_EXTERN void giveReadCredit(int32_t);
+ QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
- //all the following will be called on the same thread
- QPID_COMMON_EXTERN bool doOutput();
- QPID_COMMON_EXTERN bool hasOutput();
- QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
- QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t);
- QPID_COMMON_EXTERN void removeAll();
+ // These functions must not be called concurrently with each other.
+ QPID_COMMON_EXTERN bool doOutput();
+ QPID_COMMON_EXTERN bool hasOutput();
+ QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t);
+ QPID_COMMON_EXTERN void removeAll();
- /** Apply f to each OutputTask* in the tasks list */
- template <class F> void eachOutput(F f) {
- std::for_each(tasks.begin(), tasks.end(), f);
- }
- };
+ /** Apply f to each OutputTask* in the tasks list */
+ template <class F> void eachOutput(F f) {
+ Mutex::ScopedLock l(lock);
+ std::for_each(tasks.begin(), tasks.end(), f);
+ }
+};
-}
-}
+}} // namespace qpid::sys
#endif
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 58b067a3db..8b9cbfed1e 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -65,8 +65,6 @@
</class>
- <!-- TODO aconway 2008-09-10: support for un-attached connections. -->
-
<!-- Controls associated with a specific connection. -->
<class name="cluster-connection" code="0x81" label="Qpid clustering extensions.">
@@ -91,6 +89,8 @@
<field name="name" type="str8"/>
<field name="blocked" type="bit"/>
<field name="notifyEnabled" type="bit"/>
+ <!-- Flag set if the consumer is in its queue's listener set. -->
+ <field name="is-in-listener" type="bit"/>
</control>
<!-- Delivery-record for outgoing messages sent but not yet accepted. -->
@@ -121,8 +121,14 @@
<control name="tx-end" code="0x17"/>
<control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
+ <!-- Consumers in the connection's output task -->
+ <control name="output-task" code="0x19">
+ <field name="channel" type="uint16"/>
+ <field name="name" type="str8"/>
+ </control>
+
<!-- Complete a session state update. -->
- <control name="session-state" code="0x1F" label="Set session state during a brain update.">
+ <control name="session-state" code="0x1F">
<!-- Target session deduced from channel number. -->
<field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.-->
<field name="command-point" type="sequence-no"/> <!-- Id of next command sent -->