diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 99 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 90 |
9 files changed, 227 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 331bb5e716..7d02fb3d3c 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -37,12 +37,18 @@ using std::string; TransferAdapter Message::TRANSFER; -Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {} +Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), +staged(false), forcePersistentPolicy(false), publisher(0), adapter(0) {} Message::~Message() { } +void Message::forcePersistent() +{ + forcePersistentPolicy = true; +} + std::string Message::getRoutingKey() const { return getAdapter().getRoutingKey(frames); @@ -73,7 +79,7 @@ const FieldTable* Message::getApplicationHeaders() const bool Message::isPersistent() { - return getAdapter().isPersistent(frames); + return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } bool Message::requiresAccept() diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index a5ea0a7a37..8eb1b0b31c 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -133,6 +133,8 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); + + void forcePersistent(); private: mutable sys::Mutex lock; @@ -142,6 +144,7 @@ public: bool redelivered; bool loaded; bool staged; + bool forcePersistentPolicy; // used to force message as durable, via a broker policy ConnectionToken* publisher; mutable MessageAdapter* adapter; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 7dc6197fa2..d50e887df4 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -63,6 +63,10 @@ Queue::Queue(const string& _name, bool _autodelete, consumerCount(0), exclusive(0), noLocal(false), + lastValueQueue(false), + optimisticConsume(false), + persistLastNode(false), + inLastNodeFailure(false), persistenceId(0), policyExceeded(false), mgmtObject(0) @@ -134,21 +138,12 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ } else { // if no store then mark as enqueued if (!enqueue(0, msg)){ - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - } push(msg); msg->enqueueComplete(); }else { - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } - push(msg); + push(msg); } + mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); } } @@ -157,12 +152,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } + mgntEnqStats(msg); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -173,16 +163,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgntEnqStats(msg); + if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); - if (msg->isPersistent ()) { - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } - } + } } void Queue::requeue(const QueuedMessage& msg){ @@ -466,20 +451,46 @@ bool Queue::canAutoDelete() const return autodelete && !consumerCount; } +void Queue::clearLastNodeFailure() +{ + inLastNodeFailure = false; +} + +void Queue::setLastNodeFailure() +{ + if (persistLastNode){ + Mutex::ScopedLock locker(messageLock); + for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { + i->payload->forcePersistent(); + if (i->payload->getPersistenceId() == 0){ + enqueue(0, i->payload); + } + } + inLastNodeFailure = true; + } +} + // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { - if (traceId.size()) { + if (inLastNodeFailure && persistLastNode){ + msg->forcePersistent(); + } + + if (traceId.size()) { msg->addTraceId(traceId); } if (msg->isPersistent() && store) { - msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); + if (optimisticConsume){ + msg->enqueueComplete(); // (optimistic) allow consume before written to disk + } else { + msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + } + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; } - //msg->enqueueAsync(); // increments intrusive ptr cnt return false; } @@ -492,12 +503,15 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } if (msg.payload->isPersistent() && store) { - msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); + if (optimisticConsume) { + msg.payload->dequeueComplete(); + } else { + msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + } + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); return true; } - //msg->dequeueAsync(); // decrements intrusive ptr cnt return false; } @@ -519,14 +533,7 @@ void Queue::popAndDequeue() void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - if (msg.payload->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); - } - } + mgntDeqStats(msg.payload); } @@ -537,6 +544,9 @@ namespace const std::string qpidNoLocal("no-local"); const std::string qpidTraceIdentity("qpid.trace.id"); const std::string qpidTraceExclude("qpid.trace.exclude"); + const std::string qpidLastValueQueue("qpid.last_value_queue"); + const std::string qpidOptimisticConsume("qpid.optimistic_consume"); + const std::string qpidPersistLastNode("qpid.persist_last_node"); } void Queue::create(const FieldTable& _settings) @@ -555,6 +565,15 @@ void Queue::configure(const FieldTable& _settings) noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + lastValueQueue= _settings.get(qpidLastValueQueue); + if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); + + optimisticConsume= _settings.get(qpidOptimisticConsume); + if (optimisticConsume) QPID_LOG(debug, "Configured queue with optimistic consume"); + + persistLastNode= _settings.get(qpidPersistLastNode); + if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); + traceId = _settings.getString(qpidTraceIdentity); std::string excludeList = _settings.getString(qpidTraceExclude); if (excludeList.size()) { diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index d90e1be3d1..3bde07c4d6 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -73,6 +73,10 @@ namespace qpid { uint32_t consumerCount; OwnershipToken* exclusive; bool noLocal; + bool lastValueQueue; + bool optimisticConsume; + bool persistLastNode; + bool inLastNodeFailure; std::string traceId; std::vector<std::string> traceExclude; Listeners listeners; @@ -103,6 +107,26 @@ namespace qpid { void dequeued(const QueuedMessage& msg); void popAndDequeue(); + inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg){ + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + if (msg->isPersistent ()) { + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + } + } + }; + inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg){ + if (mgmtObject != 0){ + mgmtObject->inc_msgTotalDequeues (); + mgmtObject->inc_byteTotalDequeues (msg->contentSize()); + if (msg->isPersistent ()){ + mgmtObject->inc_msgPersistDequeues (); + mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + } + } + }; public: @@ -178,6 +202,11 @@ namespace qpid { bool canAutoDelete() const; const QueueBindings& getBindings() const { return bindings; } + /** + * used to take messages from in memory and flush down to disk. + */ + void setLastNodeFailure(); + void clearLastNodeFailure(); bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); /** diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 61bdb0ffde..62d2222595 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -27,7 +27,7 @@ using namespace qpid::broker; using namespace qpid::sys; QueueRegistry::QueueRegistry() : - counter(1), store(0), parent(0) {} + counter(1), store(0), parent(0), lastNode(false) {} QueueRegistry::~QueueRegistry(){} @@ -43,6 +43,7 @@ QueueRegistry::declare(const string& declareName, bool durable, if (i == queues.end()) { Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent)); queues[name] = queue; + if (lastNode) queue->setLastNodeFailure(); return std::pair<Queue::shared_ptr, bool>(queue, true); } else { @@ -91,3 +92,17 @@ void QueueRegistry::setStore (MessageStore* _store) MessageStore* QueueRegistry::getStore() const { return store; } + +void QueueRegistry::updateQueueClusterState(bool _lastNode) +{ + RWlock::ScopedRlock locker(lock); + for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) { + if (_lastNode){ + i->second->setLastNodeFailure(); + } else { + i->second->clearLastNodeFailure(); + } + } + lastNode = _lastNode; +} + diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 90df662cbe..ca2cd132c4 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -107,6 +107,12 @@ class QueueRegistry{ for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i) f(i->second); } + + /** + * Change queue mode when cluster size drops to 1 node, expands again + * in practice allows flow queue to disk when last name to be exectuted + */ + void updateQueueClusterState(bool lastNode); private: typedef std::map<string, Queue::shared_ptr> QueueMap; @@ -115,6 +121,7 @@ private: int counter; MessageStore* store; management::Manageable* parent; + bool lastNode; //used to set mode on queue declare //destroy impl that assumes lock is already held: void destroyLH (const string& name); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 7feee4ce14..b48443526c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/QueueRegistry.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterDumpRequestBody.h" #include "qpid/framing/ClusterUpdateBody.h" @@ -71,7 +72,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : handler(&joiningHandler), joiningHandler(*this), memberHandler(*this), - mcastId() + mcastId(), + lastSize(1) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ @@ -332,7 +334,17 @@ void Cluster::stopFullCluster(void) { void Cluster::updateMemberStats() { if (mgmtObject) { - mgmtObject->set_clusterSize(size()); + if (lastSize != size() && size() ==1){ + QPID_LOG(info, "Last node standing, updating queue policies, size:" <<size()); + broker.getQueues().updateQueueClusterState(true); + lastSize = size(); + }else if (lastSize != size() && size() > 1) { + QPID_LOG(info, "Recover back from last node standing, updating queue policies, size:" <<size()); + broker.getQueues().updateQueueClusterState(false); + lastSize = size(); + } + + mgmtObject->set_clusterSize(size()); std::vector<Url> vectUrl = getUrls(); string urlstr; for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 0b56540f9a..a8c916a99b 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -164,6 +164,7 @@ class Cluster : private Cpg::Handler, public management::Manageable MemberHandler memberHandler; uint32_t mcastId; + size_t lastSize; friend class ClusterHandler; friend class JoiningHandler; diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 8795dbcd03..111920aa59 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/NullMessageStore.h" #include "qpid/framing/MessageTransferBody.h" #include <iostream> #include "boost/format.hpp" @@ -236,6 +237,95 @@ QPID_AUTO_TEST_CASE(testBound) exchange3->route(deliverable, key, &args); } +QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ + + FieldTable args; + + // set queue mode + args.setInt("qpid.persist_last_node", 1); + + Queue::shared_ptr queue(new Queue("my-queue", true)); + queue->configure(args); + + intrusive_ptr<Message> msg1 = message("e", "A"); + intrusive_ptr<Message> msg2 = message("e", "B"); + intrusive_ptr<Message> msg3 = message("e", "C"); + + //enqueue 2 messages + queue->deliver(msg1); + queue->deliver(msg2); + + //change mode + queue->setLastNodeFailure(); + + //enqueue 1 message + queue->deliver(msg3); + + //check all have persistent ids. + BOOST_CHECK(msg1->isPersistent()); + BOOST_CHECK(msg2->isPersistent()); + BOOST_CHECK(msg3->isPersistent()); + +} + +class TestMessageStore : public NullMessageStore +{ + public: + + virtual void dequeue(TransactionContext*, + const boost::intrusive_ptr<PersistableMessage>& /*msg*/, + const PersistableQueue& /*queue*/) + { + } + + virtual void enqueue(TransactionContext*, + const boost::intrusive_ptr<PersistableMessage>& /*msg*/, + const PersistableQueue& /* queue */) + { + } + + TestMessageStore() : NullMessageStore(false) {} + ~TestMessageStore(){} +}; + + +QPID_AUTO_TEST_CASE(testOptimisticConsume){ + + FieldTable args; + args.setInt("qpid.persist_last_node", 1); + + // set queue mode + + TestMessageStore store; + Queue::shared_ptr queue(new Queue("my-queue", true, &store)); + queue->setLastNodeFailure(); + + intrusive_ptr<Message> msg1 = message("e", "A"); + intrusive_ptr<Message> msg2 = message("e", "B"); + intrusive_ptr<Message> msg3 = message("e", "C"); + msg1->forcePersistent(); + msg2->forcePersistent(); + msg3->forcePersistent(); + + //enqueue 2 messages + queue->deliver(msg1); + queue->deliver(msg2); + + //change mode + args.setInt("qpid.optimistic_consume", 1); + queue->configure(args); + + //enqueue 1 message + queue->deliver(msg3); + + //check all have persistent ids. + BOOST_CHECK(!msg1->isEnqueueComplete()); + BOOST_CHECK(!msg2->isEnqueueComplete()); + BOOST_CHECK(msg3->isEnqueueComplete()); + +} + + QPID_AUTO_TEST_SUITE_END() |