summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp102
1 files changed, 57 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index b2a8e223c5..86de96468d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -181,6 +181,8 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
void Queue::recover(boost::intrusive_ptr<Message>& msg){
+ if (policy.get()) policy->recoverEnqueued(msg);
+
push(msg, true);
if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
@@ -206,11 +208,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
- if (!isEnqueued(msg)) return;
-
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
+ if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
@@ -563,16 +564,10 @@ void Queue::popMsg(QueuedMessage& qmsg)
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
- Messages dequeues;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (policy.get()) {
- policy->tryEnqueue(qm);
- //depending on policy, may have some dequeues
- if (!isRecovery) pendingDequeues.swap(dequeues);
- }
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -606,12 +601,11 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (eventMgr) eventMgr->enqueued(qm);
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
+ if (policy.get()) {
+ policy->enqueued(qm);
+ }
}
copy.notify();
- if (!dequeues.empty()) {
- //depending on policy, may have some dequeues
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
- }
}
QueuedMessage Queue::getFront()
@@ -697,8 +691,19 @@ void Queue::setLastNodeFailure()
}
// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
+ if (policy.get() && !suppressPolicyCheck) {
+ Messages dequeues;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ policy->tryEnqueue(msg);
+ policy->getPendingDequeues(dequeues);
+ }
+ //depending on policy, may have some dequeues that need to performed without holding the lock
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ }
+
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
@@ -707,15 +712,27 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
msg->addTraceId(traceId);
}
- if (msg->isPersistent() && store) {
+ if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
}
+ if (!store) {
+ //Messages enqueued on a transient queue should be prevented
+ //from having their content released as it may not be
+ //recoverable by these queue for delivery
+ msg->blockContentRelease();
+ }
return false;
}
+void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (policy.get()) policy->enqueueAborted(msg);
+}
+
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
@@ -726,7 +743,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
dequeued(msg);
}
}
- if (msg.payload->isPersistent() && store) {
+ if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
@@ -781,22 +798,37 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings, bool recovering)
{
- setPolicy(QueuePolicy::createQueuePolicy(_settings));
+
+ eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ if ( NullMessageStore::isNullStore(store)) {
+ QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
+ } else if (eventMgr && !eventMgr->isSync() ) {
+ QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
+ }
+ FieldTable copy(_settings);
+ copy.erase(QueuePolicy::typeKey);
+ setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
+ } else {
+ setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
+ }
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+ if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
+ QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
lastValueQueue = lastValueQueueNoBrowse;
}
persistLastNode= _settings.get(qpidPersistLastNode);
- if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+ if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
traceId = _settings.getAsString(qpidTraceIdentity);
std::string excludeList = _settings.getAsString(qpidTraceExclude);
@@ -806,8 +838,6 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
- eventMode = _settings.getAsInt(qpidQueueEventGeneration);
-
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
@@ -975,19 +1005,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
-bool Queue::releaseMessageContent(const QueuedMessage& m)
-{
- if (store && !NullMessageStore::isNullStore(store)) {
- QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
- m.payload->releaseContent(store);
- return true;
- } else {
- QPID_LOG(warning, "Message " << m.position << " on " << name
- << " cannot be released from memory as the queue is not durable");
- return false;
- }
-}
-
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
@@ -1044,11 +1061,12 @@ void Queue::insertSequenceNumbers(const std::string& key)
void Queue::enqueued(const QueuedMessage& m)
{
if (m.payload) {
- if (policy.get()) policy->tryEnqueue(m);
- mgntEnqStats(m.payload);
- if (m.payload->isPersistent()) {
- enqueue ( 0, m.payload );
+ if (policy.get()) {
+ policy->recoverEnqueued(m.payload);
+ policy->enqueued(m);
}
+ mgntEnqStats(m.payload);
+ enqueue ( 0, m.payload, true );
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1059,10 +1077,4 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
return !policy.get() || policy->isEnqueued(msg);
}
-void Queue::addPendingDequeue(const QueuedMessage& msg)
-{
- //assumes lock is held - true at present but rather nasty as this is a public method
- pendingDequeues.push_back(msg);
-}
-
QueueListeners& Queue::getListeners() { return listeners; }