diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 99 |
1 files changed, 59 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 7dc6197fa2..d50e887df4 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -63,6 +63,10 @@ Queue::Queue(const string& _name, bool _autodelete, consumerCount(0), exclusive(0), noLocal(false), + lastValueQueue(false), + optimisticConsume(false), + persistLastNode(false), + inLastNodeFailure(false), persistenceId(0), policyExceeded(false), mgmtObject(0) @@ -134,21 +138,12 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ } else { // if no store then mark as enqueued if (!enqueue(0, msg)){ - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - } push(msg); msg->enqueueComplete(); }else { - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } - push(msg); + push(msg); } + mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); } } @@ -157,12 +152,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } + mgntEnqStats(msg); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -173,16 +163,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgntEnqStats(msg); + if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); - if (msg->isPersistent ()) { - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } - } + } } void Queue::requeue(const QueuedMessage& msg){ @@ -466,20 +451,46 @@ bool Queue::canAutoDelete() const return autodelete && !consumerCount; } +void Queue::clearLastNodeFailure() +{ + inLastNodeFailure = false; +} + +void Queue::setLastNodeFailure() +{ + if (persistLastNode){ + Mutex::ScopedLock locker(messageLock); + for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { + i->payload->forcePersistent(); + if (i->payload->getPersistenceId() == 0){ + enqueue(0, i->payload); + } + } + inLastNodeFailure = true; + } +} + // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { - if (traceId.size()) { + if (inLastNodeFailure && persistLastNode){ + msg->forcePersistent(); + } + + if (traceId.size()) { msg->addTraceId(traceId); } if (msg->isPersistent() && store) { - msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); + if (optimisticConsume){ + msg->enqueueComplete(); // (optimistic) allow consume before written to disk + } else { + msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + } + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; } - //msg->enqueueAsync(); // increments intrusive ptr cnt return false; } @@ -492,12 +503,15 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } if (msg.payload->isPersistent() && store) { - msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); + if (optimisticConsume) { + msg.payload->dequeueComplete(); + } else { + msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + } + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); return true; } - //msg->dequeueAsync(); // decrements intrusive ptr cnt return false; } @@ -519,14 +533,7 @@ void Queue::popAndDequeue() void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - if (msg.payload->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); - } - } + mgntDeqStats(msg.payload); } @@ -537,6 +544,9 @@ namespace const std::string qpidNoLocal("no-local"); const std::string qpidTraceIdentity("qpid.trace.id"); const std::string qpidTraceExclude("qpid.trace.exclude"); + const std::string qpidLastValueQueue("qpid.last_value_queue"); + const std::string qpidOptimisticConsume("qpid.optimistic_consume"); + const std::string qpidPersistLastNode("qpid.persist_last_node"); } void Queue::create(const FieldTable& _settings) @@ -555,6 +565,15 @@ void Queue::configure(const FieldTable& _settings) noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + lastValueQueue= _settings.get(qpidLastValueQueue); + if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); + + optimisticConsume= _settings.get(qpidOptimisticConsume); + if (optimisticConsume) QPID_LOG(debug, "Configured queue with optimistic consume"); + + persistLastNode= _settings.get(qpidPersistLastNode); + if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); + traceId = _settings.getString(qpidTraceIdentity); std::string excludeList = _settings.getString(qpidTraceExclude); if (excludeList.size()) { |