diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 48 |
1 files changed, 39 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3ae53c8ea9..aa0cd8ca31 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -32,6 +32,7 @@ #include "qpid/log/Statement.h" #include "qpid/management/ManagementBroker.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" @@ -68,6 +69,9 @@ const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +//following feature is not ready for general use as it doesn't handle +//the case where a message is enqueued on more than one queue well enough: +const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; @@ -93,7 +97,8 @@ Queue::Queue(const string& _name, bool _autodelete, policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0) + eventMgr(0), + insertSeqNo(0) { if (parent != 0) { @@ -176,7 +181,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ - push(msg); + push(msg, true); msg->enqueueComplete(); // mark the message as enqueued mgntEnqStats(msg); @@ -545,12 +550,13 @@ void Queue::popMsg(QueuedMessage& qmsg) ++dequeueTracker; } -void Queue::push(boost::intrusive_ptr<Message>& msg){ +void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (policy.get()) policy->tryEnqueue(qm); + if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; const framing::FieldTable* ft = msg->getApplicationHeaders(); @@ -566,14 +572,21 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; i->second->setReplacementMessage(msg,this); - dequeued(QueuedMessage(qm.queue, old, qm.position)); + if (isRecovery) { + //can't issue new requests for the store until + //recovery is complete + pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); + } else { + dequeue(0, QueuedMessage(qm.queue, old, qm.position)); + } } }else { messages.push_back(qm); listeners.populate(copy); } - if (eventMode && eventMgr) { - eventMgr->enqueued(qm); + if (eventMode) { + if (eventMgr) eventMgr->enqueued(qm); + else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } } copy.notify(); @@ -664,7 +677,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) msg->addTraceId(traceId); } - if (msg->isPersistent() && store && !lastValueQueue) { + if (msg->isPersistent() && store) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); @@ -676,14 +689,14 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { - if (policy.get() && !policy->isEnqueued(msg)) return false; { Mutex::ScopedLock locker(messageLock); + if (policy.get() && !policy->isEnqueued(msg)) return false; if (!ctxt) { dequeued(msg); } } - if (msg.payload->isPersistent() && store && !lastValueQueue) { + if (msg.payload->isPersistent() && store) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); @@ -765,6 +778,9 @@ void Queue::configure(const FieldTable& _settings, bool recovering) eventMode = _settings.getAsInt(qpidQueueEventGeneration); + FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); + if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); + if (mgmtObject != 0) mgmtObject->set_arguments (_settings); @@ -976,3 +992,17 @@ void Queue::setQueueEventManager(QueueEvents& mgr) { eventMgr = &mgr; } + +void Queue::recoveryComplete() +{ + //process any pending dequeues + for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + pendingDequeues.clear(); +} + +void Queue::insertSequenceNumbers(const std::string& key) +{ + seqNoKey = key; + insertSeqNo = !seqNoKey.empty(); + QPID_LOG(debug, "Inserting sequence numbers as " << key); +} |