/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "BrokerContext.h" #include "EventHandler.h" #include "Group.h" #include "Multicaster.h" #include "QueueContext.h" #include "QueueHandler.h" #include "hash.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/framing/ClusterMessageAcquireBody.h" #include "qpid/framing/ClusterMessageDequeueBody.h" #include "qpid/framing/ClusterQueueConsumedBody.h" #include "qpid/framing/ClusterQueueSubscribeBody.h" #include "qpid/framing/ClusterQueueUnsubscribeBody.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { using framing::SequenceSet; const framing::ProtocolVersion pv; // shorthand QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_) : ownership(UNSUBSCRIBED), consumers(0), consuming(false), ticks(0), queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())), maxTicks(maxTicks_), group(g) { q.setClusterContext(std::auto_ptr(this)); q.stopConsumers(); // Stop queue initially. group.getTicker().add(this); } QueueContext::~QueueContext() { // Lifecycle: must remove all references to this context before it is deleted. // Must be sure that there can be no use of this context later. group.getTicker().remove(this); group.getEventHandler().getHandler()->remove(queue); } namespace { bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } } // Called by QueueReplica in CPG deliver thread when state changes. void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) { sys::Mutex::ScopedLock l(lock); // Interested in state changes which lead to ownership. // We voluntarily give up ownership before multicasting // the state change so we don't need to handle transitions // that lead to non-ownership. if (before != after && isOwner(after)) { assert(before == ownership); if (!consuming) queue.startConsumers(); consuming = true; ticks = 0; } ownership = after; } // FIXME aconway 2011-07-27: Dont spin the token on an empty queue. // Called in broker threads when a consumer is added void QueueContext::consume(size_t n) { sys::Mutex::ScopedLock l(lock); if (consumers == 0 && n > 0 && ownership == UNSUBSCRIBED) mcast.mcast( framing::ClusterQueueSubscribeBody(pv, queue.getName())); consumers = n; } // Called in broker threads when a consumer is cancelled void QueueContext::cancel(size_t n) { sys::Mutex::ScopedLock l(lock); consumers = n; if (n == 0 && consuming) queue.stopConsumers(); } // FIXME aconway 2011-11-03: review scope of locking around sendConsumed // Called in Ticker thread. void QueueContext::tick() { sys::Mutex::ScopedLock l(lock); if (!consuming) return; // Nothing to do if we don't have the lock. if (ownership == SHARED_OWNER && ++ticks >= maxTicks) queue.stopConsumers(); else if (ownership == SOLE_OWNER) sendConsumed(l); // Status report on consumption } // Callback set up by queue.stopConsumers() called in connection or timer thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { sys::Mutex::ScopedLock l(lock); if (!consuming) return; // !consuming => initial stopConsumers in ctor. sendConsumed(l); mcast.mcast( framing::ClusterQueueUnsubscribeBody(pv, queue.getName(), consumers)); consuming = false; } void QueueContext::sendConsumed(const sys::Mutex::ScopedLock&) { if (acquired.empty() && dequeued.empty()) return; // Nothing to send mcast.mcast( framing::ClusterQueueConsumedBody(pv, queue.getName(), acquired,dequeued)); acquired.clear(); dequeued.clear(); } void QueueContext::requeue(uint32_t position, bool redelivered) { // No lock, unacked has its own lock. broker::QueuedMessage qm = unacked.pop(position); if (qm.queue) { if (redelivered) qm.payload->redeliver(); BrokerContext::ScopedSuppressReplication ssr; queue.requeue(qm); } } void QueueContext::localAcquire(uint32_t position) { QPID_LOG(trace, "cluster queue " << queue.getName() << " acquired " << position); sys::Mutex::ScopedLock l(lock); assert(consuming); acquired.add(position); } void QueueContext::localDequeue(uint32_t position) { QPID_LOG(trace, "cluster queue " << queue.getName() << " dequeued " << position); // FIXME aconway 2010-10-28: for local dequeues, we should // complete the ack that initiated the dequeue at this point. sys::Mutex::ScopedLock l(lock); // FIXME aconway 2011-11-03: this assertion fails for explicit accept // because it doesn't respect the consume lock. // assert(consuming); dequeued.add(position); } void QueueContext::consumed( const MemberId& sender, const SequenceSet& acquired, const SequenceSet& dequeued) { // No lock, doesn't touch any members. // FIXME aconway 2011-09-15: systematic logging across cluster module. // FIXME aconway 2011-09-23: pretty printing for identifier. QPID_LOG(trace, "cluster: " << sender << " acquired: " << acquired << " dequeued: " << dequeued << " on queue: " << queue.getName()); // Note acquires from other members. My own acquires were executed in // the connection thread if (sender != group.getSelf()) { // FIXME aconway 2011-09-23: avoid individual finds, scan queue once. for (SequenceSet::iterator i = acquired.begin(); i != acquired.end(); ++i) acquire(*i); } // Process deques from the queue owner. // FIXME aconway 2011-09-23: avoid individual finds, scan queue once. for (SequenceSet::iterator i = dequeued.begin(); i != dequeued.end(); ++i) dequeue(*i); } // Remote acquire void QueueContext::acquire(uint32_t position) { // No lock, doesn't touch any members. broker::QueuedMessage qm; BrokerContext::ScopedSuppressReplication ssr; if (!queue.acquireMessageAt(position, qm)) // FIXME aconway 2011-10-31: error handling throw Exception(QPID_MSG("cluster: acquire: message not found: " << queue.getName() << "[" << position << "]")); assert(qm.position.getValue() == position); assert(qm.payload); unacked.put(qm.position, qm); // unacked has its own lock. } void QueueContext::dequeue(uint32_t position) { // No lock, doesn't touch any members. unacked has its own lock. broker::QueuedMessage qm = unacked.pop(position); BrokerContext::ScopedSuppressReplication ssr; if (qm.queue) queue.dequeue(0, qm); } QueueContext* QueueContext::get(broker::Queue& q) { return static_cast(q.getClusterContext()); } // FIXME aconway 2011-09-23: make unacked a plain map, use lock. }} // namespace qpid::cluster