summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-09-30 14:57:32 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-09-30 14:57:32 +0000
commit008ce22eadfbbd761f6b7a2011c78ce509c2ddb5 (patch)
tree5262121c428c7e1b2eedd89a1f5a450dc80f6c7d /qpid/cpp
parent0dd5f66ac0b4fab785d87d6c2ddf8e95fbd6e9e2 (diff)
downloadqpid-python-008ce22eadfbbd761f6b7a2011c78ce509c2ddb5.tar.gz
QPID-1306
This patch includes: - Optimistic Consume - Support for forcing Queue durable on cluster failure - Some cleanup on mgnt functions in Queue to inlines - Tests Still coming - header for client queue options - LVQ support bits. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@700489 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp99
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h29
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp16
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp90
9 files changed, 227 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 331bb5e716..7d02fb3d3c 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -37,12 +37,18 @@ using std::string;
TransferAdapter Message::TRANSFER;
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false),
+staged(false), forcePersistentPolicy(false), publisher(0), adapter(0) {}
Message::~Message()
{
}
+void Message::forcePersistent()
+{
+ forcePersistentPolicy = true;
+}
+
std::string Message::getRoutingKey() const
{
return getAdapter().getRoutingKey(frames);
@@ -73,7 +79,7 @@ const FieldTable* Message::getApplicationHeaders() const
bool Message::isPersistent()
{
- return getAdapter().isPersistent(frames);
+ return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
}
bool Message::requiresAccept()
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index a5ea0a7a37..8eb1b0b31c 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -133,6 +133,8 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
+
+ void forcePersistent();
private:
mutable sys::Mutex lock;
@@ -142,6 +144,7 @@ public:
bool redelivered;
bool loaded;
bool staged;
+ bool forcePersistentPolicy; // used to force message as durable, via a broker policy
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 7dc6197fa2..d50e887df4 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -63,6 +63,10 @@ Queue::Queue(const string& _name, bool _autodelete,
consumerCount(0),
exclusive(0),
noLocal(false),
+ lastValueQueue(false),
+ optimisticConsume(false),
+ persistLastNode(false),
+ inLastNodeFailure(false),
persistenceId(0),
policyExceeded(false),
mgmtObject(0)
@@ -134,21 +138,12 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
} else {
// if no store then mark as enqueued
if (!enqueue(0, msg)){
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- }
push(msg);
msg->enqueueComplete();
}else {
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
- push(msg);
+ push(msg);
}
+ mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
}
}
@@ -157,12 +152,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
+ mgntEnqStats(msg);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -173,16 +163,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgntEnqStats(msg);
+ if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
- if (msg->isPersistent ()) {
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
- }
+ }
}
void Queue::requeue(const QueuedMessage& msg){
@@ -466,20 +451,46 @@ bool Queue::canAutoDelete() const
return autodelete && !consumerCount;
}
+void Queue::clearLastNodeFailure()
+{
+ inLastNodeFailure = false;
+}
+
+void Queue::setLastNodeFailure()
+{
+ if (persistLastNode){
+ Mutex::ScopedLock locker(messageLock);
+ for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
+ i->payload->forcePersistent();
+ if (i->payload->getPersistenceId() == 0){
+ enqueue(0, i->payload);
+ }
+ }
+ inLastNodeFailure = true;
+ }
+}
+
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
- if (traceId.size()) {
+ if (inLastNodeFailure && persistLastNode){
+ msg->forcePersistent();
+ }
+
+ if (traceId.size()) {
msg->addTraceId(traceId);
}
if (msg->isPersistent() && store) {
- msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ if (optimisticConsume){
+ msg->enqueueComplete(); // (optimistic) allow consume before written to disk
+ } else {
+ msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ }
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
}
- //msg->enqueueAsync(); // increments intrusive ptr cnt
return false;
}
@@ -492,12 +503,15 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
dequeued(msg);
}
if (msg.payload->isPersistent() && store) {
- msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
+ if (optimisticConsume) {
+ msg.payload->dequeueComplete();
+ } else {
+ msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ }
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
return true;
}
- //msg->dequeueAsync(); // decrements intrusive ptr cnt
return false;
}
@@ -519,14 +533,7 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- if (msg.payload->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
- }
- }
+ mgntDeqStats(msg.payload);
}
@@ -537,6 +544,9 @@ namespace
const std::string qpidNoLocal("no-local");
const std::string qpidTraceIdentity("qpid.trace.id");
const std::string qpidTraceExclude("qpid.trace.exclude");
+ const std::string qpidLastValueQueue("qpid.last_value_queue");
+ const std::string qpidOptimisticConsume("qpid.optimistic_consume");
+ const std::string qpidPersistLastNode("qpid.persist_last_node");
}
void Queue::create(const FieldTable& _settings)
@@ -555,6 +565,15 @@ void Queue::configure(const FieldTable& _settings)
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ lastValueQueue= _settings.get(qpidLastValueQueue);
+ if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+
+ optimisticConsume= _settings.get(qpidOptimisticConsume);
+ if (optimisticConsume) QPID_LOG(debug, "Configured queue with optimistic consume");
+
+ persistLastNode= _settings.get(qpidPersistLastNode);
+ if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+
traceId = _settings.getString(qpidTraceIdentity);
std::string excludeList = _settings.getString(qpidTraceExclude);
if (excludeList.size()) {
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index d90e1be3d1..3bde07c4d6 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -73,6 +73,10 @@ namespace qpid {
uint32_t consumerCount;
OwnershipToken* exclusive;
bool noLocal;
+ bool lastValueQueue;
+ bool optimisticConsume;
+ bool persistLastNode;
+ bool inLastNodeFailure;
std::string traceId;
std::vector<std::string> traceExclude;
Listeners listeners;
@@ -103,6 +107,26 @@ namespace qpid {
void dequeued(const QueuedMessage& msg);
void popAndDequeue();
+ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg){
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ if (msg->isPersistent ()) {
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ }
+ }
+ };
+ inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg){
+ if (mgmtObject != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ if (msg->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ }
+ }
+ };
public:
@@ -178,6 +202,11 @@ namespace qpid {
bool canAutoDelete() const;
const QueueBindings& getBindings() const { return bindings; }
+ /**
+ * used to take messages from in memory and flush down to disk.
+ */
+ void setLastNodeFailure();
+ void clearLastNodeFailure();
bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
/**
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 61bdb0ffde..62d2222595 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -27,7 +27,7 @@ using namespace qpid::broker;
using namespace qpid::sys;
QueueRegistry::QueueRegistry() :
- counter(1), store(0), parent(0) {}
+ counter(1), store(0), parent(0), lastNode(false) {}
QueueRegistry::~QueueRegistry(){}
@@ -43,6 +43,7 @@ QueueRegistry::declare(const string& declareName, bool durable,
if (i == queues.end()) {
Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent));
queues[name] = queue;
+ if (lastNode) queue->setLastNodeFailure();
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
@@ -91,3 +92,17 @@ void QueueRegistry::setStore (MessageStore* _store)
MessageStore* QueueRegistry::getStore() const {
return store;
}
+
+void QueueRegistry::updateQueueClusterState(bool _lastNode)
+{
+ RWlock::ScopedRlock locker(lock);
+ for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) {
+ if (_lastNode){
+ i->second->setLastNodeFailure();
+ } else {
+ i->second->clearLastNodeFailure();
+ }
+ }
+ lastNode = _lastNode;
+}
+
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index 90df662cbe..ca2cd132c4 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -107,6 +107,12 @@ class QueueRegistry{
for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
f(i->second);
}
+
+ /**
+ * Change queue mode when cluster size drops to 1 node, expands again
+ * in practice allows flow queue to disk when last name to be exectuted
+ */
+ void updateQueueClusterState(bool lastNode);
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
@@ -115,6 +121,7 @@ private:
int counter;
MessageStore* store;
management::Manageable* parent;
+ bool lastNode; //used to set mode on queue declare
//destroy impl that assumes lock is already held:
void destroyLH (const string& name);
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 7feee4ce14..b48443526c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/QueueRegistry.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
@@ -71,7 +72,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
handler(&joiningHandler),
joiningHandler(*this),
memberHandler(*this),
- mcastId()
+ mcastId(),
+ lastSize(1)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -332,7 +334,17 @@ void Cluster::stopFullCluster(void) {
void Cluster::updateMemberStats() {
if (mgmtObject) {
- mgmtObject->set_clusterSize(size());
+ if (lastSize != size() && size() ==1){
+ QPID_LOG(info, "Last node standing, updating queue policies, size:" <<size());
+ broker.getQueues().updateQueueClusterState(true);
+ lastSize = size();
+ }else if (lastSize != size() && size() > 1) {
+ QPID_LOG(info, "Recover back from last node standing, updating queue policies, size:" <<size());
+ broker.getQueues().updateQueueClusterState(false);
+ lastSize = size();
+ }
+
+ mgmtObject->set_clusterSize(size());
std::vector<Url> vectUrl = getUrls();
string urlstr;
for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 0b56540f9a..a8c916a99b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -164,6 +164,7 @@ class Cluster : private Cpg::Handler, public management::Manageable
MemberHandler memberHandler;
uint32_t mcastId;
+ size_t lastSize;
friend class ClusterHandler;
friend class JoiningHandler;
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 8795dbcd03..111920aa59 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/NullMessageStore.h"
#include "qpid/framing/MessageTransferBody.h"
#include <iostream>
#include "boost/format.hpp"
@@ -236,6 +237,95 @@ QPID_AUTO_TEST_CASE(testBound)
exchange3->route(deliverable, key, &args);
}
+QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
+
+ FieldTable args;
+
+ // set queue mode
+ args.setInt("qpid.persist_last_node", 1);
+
+ Queue::shared_ptr queue(new Queue("my-queue", true));
+ queue->configure(args);
+
+ intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg2 = message("e", "B");
+ intrusive_ptr<Message> msg3 = message("e", "C");
+
+ //enqueue 2 messages
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+
+ //change mode
+ queue->setLastNodeFailure();
+
+ //enqueue 1 message
+ queue->deliver(msg3);
+
+ //check all have persistent ids.
+ BOOST_CHECK(msg1->isPersistent());
+ BOOST_CHECK(msg2->isPersistent());
+ BOOST_CHECK(msg3->isPersistent());
+
+}
+
+class TestMessageStore : public NullMessageStore
+{
+ public:
+
+ virtual void dequeue(TransactionContext*,
+ const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
+ const PersistableQueue& /*queue*/)
+ {
+ }
+
+ virtual void enqueue(TransactionContext*,
+ const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
+ const PersistableQueue& /* queue */)
+ {
+ }
+
+ TestMessageStore() : NullMessageStore(false) {}
+ ~TestMessageStore(){}
+};
+
+
+QPID_AUTO_TEST_CASE(testOptimisticConsume){
+
+ FieldTable args;
+ args.setInt("qpid.persist_last_node", 1);
+
+ // set queue mode
+
+ TestMessageStore store;
+ Queue::shared_ptr queue(new Queue("my-queue", true, &store));
+ queue->setLastNodeFailure();
+
+ intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg2 = message("e", "B");
+ intrusive_ptr<Message> msg3 = message("e", "C");
+ msg1->forcePersistent();
+ msg2->forcePersistent();
+ msg3->forcePersistent();
+
+ //enqueue 2 messages
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+
+ //change mode
+ args.setInt("qpid.optimistic_consume", 1);
+ queue->configure(args);
+
+ //enqueue 1 message
+ queue->deliver(msg3);
+
+ //check all have persistent ids.
+ BOOST_CHECK(!msg1->isEnqueueComplete());
+ BOOST_CHECK(!msg2->isEnqueueComplete());
+ BOOST_CHECK(msg3->isEnqueueComplete());
+
+}
+
+
QPID_AUTO_TEST_SUITE_END()