summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp99
1 files changed, 59 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 7dc6197fa2..d50e887df4 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -63,6 +63,10 @@ Queue::Queue(const string& _name, bool _autodelete,
consumerCount(0),
exclusive(0),
noLocal(false),
+ lastValueQueue(false),
+ optimisticConsume(false),
+ persistLastNode(false),
+ inLastNodeFailure(false),
persistenceId(0),
policyExceeded(false),
mgmtObject(0)
@@ -134,21 +138,12 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
} else {
// if no store then mark as enqueued
if (!enqueue(0, msg)){
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- }
push(msg);
msg->enqueueComplete();
}else {
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
- push(msg);
+ push(msg);
}
+ mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
}
}
@@ -157,12 +152,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
+ mgntEnqStats(msg);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -173,16 +163,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgntEnqStats(msg);
+ if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
- if (msg->isPersistent ()) {
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
- }
+ }
}
void Queue::requeue(const QueuedMessage& msg){
@@ -466,20 +451,46 @@ bool Queue::canAutoDelete() const
return autodelete && !consumerCount;
}
+void Queue::clearLastNodeFailure()
+{
+ inLastNodeFailure = false;
+}
+
+void Queue::setLastNodeFailure()
+{
+ if (persistLastNode){
+ Mutex::ScopedLock locker(messageLock);
+ for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
+ i->payload->forcePersistent();
+ if (i->payload->getPersistenceId() == 0){
+ enqueue(0, i->payload);
+ }
+ }
+ inLastNodeFailure = true;
+ }
+}
+
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
- if (traceId.size()) {
+ if (inLastNodeFailure && persistLastNode){
+ msg->forcePersistent();
+ }
+
+ if (traceId.size()) {
msg->addTraceId(traceId);
}
if (msg->isPersistent() && store) {
- msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ if (optimisticConsume){
+ msg->enqueueComplete(); // (optimistic) allow consume before written to disk
+ } else {
+ msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ }
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
}
- //msg->enqueueAsync(); // increments intrusive ptr cnt
return false;
}
@@ -492,12 +503,15 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
dequeued(msg);
}
if (msg.payload->isPersistent() && store) {
- msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
+ if (optimisticConsume) {
+ msg.payload->dequeueComplete();
+ } else {
+ msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ }
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
return true;
}
- //msg->dequeueAsync(); // decrements intrusive ptr cnt
return false;
}
@@ -519,14 +533,7 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- if (msg.payload->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
- }
- }
+ mgntDeqStats(msg.payload);
}
@@ -537,6 +544,9 @@ namespace
const std::string qpidNoLocal("no-local");
const std::string qpidTraceIdentity("qpid.trace.id");
const std::string qpidTraceExclude("qpid.trace.exclude");
+ const std::string qpidLastValueQueue("qpid.last_value_queue");
+ const std::string qpidOptimisticConsume("qpid.optimistic_consume");
+ const std::string qpidPersistLastNode("qpid.persist_last_node");
}
void Queue::create(const FieldTable& _settings)
@@ -555,6 +565,15 @@ void Queue::configure(const FieldTable& _settings)
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ lastValueQueue= _settings.get(qpidLastValueQueue);
+ if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+
+ optimisticConsume= _settings.get(qpidOptimisticConsume);
+ if (optimisticConsume) QPID_LOG(debug, "Configured queue with optimistic consume");
+
+ persistLastNode= _settings.get(qpidPersistLastNode);
+ if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+
traceId = _settings.getString(qpidTraceIdentity);
std::string excludeList = _settings.getString(qpidTraceExclude);
if (excludeList.size()) {