/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Event.h" #include "IdSetter.h" #include "QueueGuard.h" #include "QueueSnapshot.h" #include "ReplicatingSubscription.h" #include "Primary.h" #include "HaBroker.h" #include "qpid/assert.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" #include namespace qpid { namespace ha { using namespace framing; using namespace broker; using namespace std; using sys::Mutex; using broker::amqp_0_10::MessageTransfer; namespace { const string QPID_HA(QPID_HA_PREFIX); } const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"repsub"); const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info"); const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids"); const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep"); /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr ReplicatingSubscription::Factory::create( SemanticState* parent, const string& name, Queue::shared_ptr queue, bool ack, bool acquire, bool exclusive, const string& tag, const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments ) { boost::shared_ptr rs; std::string type = arguments.getAsString(QPID_REPLICATING_SUBSCRIPTION); if (type == QPID_QUEUE_REPLICATOR) { rs.reset(new ReplicatingSubscription( haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); } if (rs) rs->initialize(); return rs; } ReplicatingSubscription::ReplicatingSubscription( HaBroker& hb, SemanticState* parent, const string& name, Queue::shared_ptr queue_, bool ack, bool /*acquire*/, bool exclusive, const string& tag, const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), logPrefix(hb.logPrefix), position(0), wasStopped(false), ready(false), cancelled(false), haBroker(hb), primary(boost::dynamic_pointer_cast(haBroker.getRole())) {} // Called in subscription's connection thread when the subscription is created. // Separate from ctor because we need to use shared_from_this // void ReplicatingSubscription::initialize() { try { FieldTable ft; if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) throw InvalidArgumentException( logPrefix.get()+"Can't subscribe, no broker info: "+getTag()); info.assign(ft); // Set a log prefix message that identifies the remote broker. ostringstream os; os << "Subscription to " << queue->getName() << " at "; info.printId(os) << ": "; logPrefix = os.str(); // If there's already a guard (we are in failover) use it, else create one. if (primary) guard = primary->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix)); // NOTE: Once the observer is attached we can have concurrent // calls to dequeued so we need to lock use of this->dequeues. // // However we must attach the observer _before_ we snapshot for // initial dequeues to be sure we don't miss any dequeues // between the snapshot and attaching the observer. queue->getObservers().add( boost::dynamic_pointer_cast(shared_from_this())); boost::shared_ptr snapshot = queue->getObservers().findType(); // There may be no snapshot if the queue is being deleted concurrently. if (!snapshot) { queue->getObservers().remove( boost::dynamic_pointer_cast(shared_from_this())); throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted"); } ReplicationIdSet primaryIds = snapshot->getSnapshot(); std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET); ReplicationIdSet backupIds; if (!backupStr.empty()) backupIds = decodeStr(backupStr); // Initial dequeues are messages on backup but not on primary. ReplicationIdSet initDequeues = backupIds - primaryIds; QueuePosition front,back; queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue { sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() dequeues += initDequeues; // Messages on backup that are not on primary. skipEnqueue = backupIds - initDequeues; // Messages already on the backup. // Queue front is moving but we know this subscriptions will start at a // position >= front so if front is safe then position must be. position = front; QPID_LOG(debug, logPrefix << "Subscribed: primary [" << front << "," << back << "]=" << primaryIds << ", guarded " << guard->getFirst() << ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")"); checkReady(l); } if (primary) primary->addReplica(*this); Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. // Send initial dequeues to the backup. // There must be a shared_ptr(this) when sending. sendDequeueEvent(l); } catch (const std::exception& e) { QPID_LOG(error, logPrefix << "Subscribe failed: " << e.what()); throw; } } ReplicatingSubscription::~ReplicatingSubscription() {} void ReplicatingSubscription::stopped() { Mutex::ScopedLock l(lock); // We have reached the last available message on the queue. // // Note that if messages have been removed out-of-order this may not be the // head of the queue. We may not even have reached the guard // position. However there are no more messages to protect and we will not // be advanced any further, so we should consider ourselves guarded for // purposes of readiness. wasStopped = true; checkReady(l); } // True if the next position for the ReplicatingSubscription is a guarded position. bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) { // See comment in stopped() return wasStopped || (position+1 >= guard->getFirst()); } // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver( const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { Mutex::ScopedLock l(lock); ReplicationId id = m.getReplicationId(); position = m.getSequence(); try { bool result = false; if (skipEnqueue.contains(id)) { QPID_LOG(trace, logPrefix << "Skip " << logMessageId(*getQueue(), m)); skipEnqueue -= id; guard->complete(id); // This will never be acknowledged. notify(); result = true; } else { QPID_LOG(trace, logPrefix << "Replicated " << logMessageId(*getQueue(), m)); if (!ready && !isGuarded(l)) unready += id; sendIdEvent(id, l); result = ConsumerImpl::deliver(c, m); } checkReady(l); return result; } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Error replicating " << logMessageId(*getQueue(), m) << ": " << e.what()); throw; } } void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { if (!ready && isGuarded(l) && unready.empty()) { ready = true; sys::Mutex::ScopedUnlock u(lock); // Notify Primary that a subscription is ready. if (position+1 >= guard->getFirst()) { QPID_LOG(debug, logPrefix << "Caught up at " << position); } else { QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst()); } if (primary) primary->readyReplica(*this); } } // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { { Mutex::ScopedLock l(lock); if (cancelled) return; cancelled = true; } QPID_LOG(debug, logPrefix << "Cancelled"); getQueue()->getObservers().remove( boost::dynamic_pointer_cast(shared_from_this())); guard->cancel(); ConsumerImpl::cancel(); } // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. ReplicationId id = r.getReplicationId(); QPID_LOG(trace, logPrefix << "Acknowledged " << logMessageId(*getQueue(), r.getMessageId(), id)); guard->complete(id); { Mutex::ScopedLock l(lock); unready -= id; checkReady(l); } ConsumerImpl::acknowledged(r); } // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); DequeueEvent d(dequeues); dequeues.clear(); sendEvent(d, l); } // Called after the message has been removed // from the deque and under the messageLock in the queue. Called in // arbitrary connection threads. void ReplicatingSubscription::dequeued(const broker::Message& m) { ReplicationId id = m.getReplicationId(); QPID_LOG(trace, logPrefix << "Dequeued ID " << id); { Mutex::ScopedLock l(lock); dequeues.add(id); } notify(); // Ensure a call to doDispatch } // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l) { sendEvent(IdEvent(pos), l); } void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&) { Mutex::ScopedUnlock u(lock); // Send the event directly to the base consumer implementation. The dummy // consumer prevents acknowledgements being handled, which is what we want // for events ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr()); } // Called in subscription's connection thread. bool ReplicatingSubscription::doDispatch() { { Mutex::ScopedLock l(lock); if (!dequeues.empty()) sendDequeueEvent(l); } try { return ConsumerImpl::doDispatch(); } catch (const std::exception& e) { QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what()); return false; } } }} // namespace qpid::ha