/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "QueueGuard.h" #include "QueueRange.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "Primary.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" #include namespace qpid { namespace ha { using namespace framing; using namespace broker; using namespace std; using sys::Mutex; const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); const string ReplicatingSubscription::QPID_BACK("qpid.ha-back"); const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front"); const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info"); namespace { const string DOLLAR("$"); const string INTERNAL("-internal"); } // namespace // Scan the queue for gaps and add them to the subscriptions dequed set. class DequeueScanner { public: DequeueScanner( ReplicatingSubscription& rs, SequenceNumber front_, SequenceNumber back_ // Inclusive ) : subscription(rs), front(front_), back(back_) { assert(front <= back); // INVARIANT deques have been added for positions <= at. at = front - 1; } void operator()(const Message& m) { if (m.getSequence() >= front && m.getSequence() <= back) { if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1); at = m.getSequence(); } } // Must call after scanning the queue. void finish() { if (at < back) subscription.dequeued(at+1, back); } private: ReplicatingSubscription& subscription; SequenceNumber front; SequenceNumber back; SequenceNumber at; }; string mask(const string& in) { return DOLLAR + in + INTERNAL; } /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr ReplicatingSubscription::Factory::create( SemanticState* parent, const string& name, Queue::shared_ptr queue, bool ack, bool acquire, bool exclusive, const string& tag, const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments ) { boost::shared_ptr rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); rs->initialize(); } return rs; } ReplicatingSubscription::ReplicatingSubscription( SemanticState* parent, const string& name, Queue::shared_ptr queue, bool ack, bool /*acquire*/, bool exclusive, const string& tag, const string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), ready(false) { try { FieldTable ft; if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) throw Exception("Replicating subscription does not have broker info: " + tag); info.assign(ft); // Set a log prefix message that identifies the remote broker. ostringstream os; os << "Primary " << queue->getName() << "@" << info << ": "; logPrefix = os.str(); // NOTE: Once the guard is attached we can have concurrent // calls to dequeued so we need to lock use of this->dequeues. // // However we must attach the guard _before_ we scan for // initial dequeues to be sure we don't miss any dequeues // between the scan and attaching the guard. if (Primary::get()) guard = Primary::get()->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); guard->attach(*this); QueueRange backup(arguments); // Remote backup range. QueueRange backupOriginal(backup); QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. backupPosition = backup.back; // Sync backup and primary queues, don't send messages already on the backup if (backup.front > primary.front || // Missing messages at front backup.back < primary.front || // No overlap primary.empty() || backup.empty()) // Empty { // No useful overlap - erase backup and start from the beginning if (!backup.empty()) dequeued(backup.front, backup.back); position = primary.front-1; } else { // backup and primary do overlap. // Remove messages from backup that are not in primary. if (primary.back < backup.back) { dequeued(primary.back+1, backup.back); // Trim excess messages at back backup.back = primary.back; } if (backup.front < primary.front) { dequeued(backup.front, primary.front-1); // Trim excess messages at front backup.front = primary.front; } DequeueScanner scan(*this, backup.front, backup.back); // FIXME aconway 2012-06-15: Optimize queue traversal, only in range. queue->eachMessage(boost::ref(scan)); // Remove missing messages in between. scan.finish(); position = backup.back; //move cursor to position queue->seek(*this, position); } // NOTE: we are assuming that the messages that are on the backup are // consistent with those on the primary. If the backup is a replica // queue and hasn't been tampered with then that will be the case. QPID_LOG(debug, logPrefix << "Subscribed:" << " backup:" << backupOriginal << " adjusted backup:" << backup << " primary:" << primary << " catch-up: " << position << "-" << primary.back << "(" << primary.back-position << ")"); // Check if we are ready yet. if (guard->subscriptionStart(position)) setReady(); } catch (const std::exception& e) { QPID_LOG(error, logPrefix << "Creation error: " << e.what() << ": arguments=" << getArguments()); throw; } } ReplicatingSubscription::~ReplicatingSubscription() {} // Called in subscription's connection thread when the subscription is created. // Called separate from ctor because sending events requires // shared_from_this // void ReplicatingSubscription::initialize() { try { Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. // Send initial dequeues and position to the backup. // There must be a shared_ptr(this) when sending. sendDequeueEvent(l); sendPositionEvent(position, l); backupPosition = position; } catch (const std::exception& e) { QPID_LOG(error, logPrefix << "Initialization error: " << e.what() << ": arguments=" << getArguments()); throw; } } // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver( const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { try { QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence()); { Mutex::ScopedLock l(lock); position = m.getSequence(); // m.getSequence() is the position of the new message on local queue. // backupPosition is latest position on backup queue before enqueueing if (m.getSequence() <= backupPosition) throw Exception( QPID_MSG(logPrefix << "Expected position > " << backupPosition << " but got " << m.getSequence())); if (m.getSequence() - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. // Send the position before message was enqueued. sendPositionEvent(m.getSequence()-1, l); } // Backup will automatically advance by 1 on delivery of message. backupPosition = m.getSequence(); } return ConsumerImpl::deliver(c, m); } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence() << ": " << e.what()); throw; } } void ReplicatingSubscription::setReady() { { Mutex::ScopedLock l(lock); if (ready) return; ready = true; } // Notify Primary that a subscription is ready. QPID_LOG(debug, logPrefix << "Caught up"); if (Primary::get()) Primary::get()->readyReplica(*this); } // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { QPID_LOG(debug, logPrefix << "Cancelled"); guard->cancel(); ConsumerImpl::cancel(); } // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId()); guard->complete(r.getMessageId()); // If next message is protected by the guard then we are ready if (r.getMessageId() >= guard->getRange().back) setReady(); ConsumerImpl::acknowledged(r); } // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); dequeues.clear(); buffer.reset(); { Mutex::ScopedUnlock u(lock); sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); } } // Called via QueueObserver::dequeued override on guard. // Called after the message has been removed // from the deque and under the messageLock in the queue. Called in // arbitrary connection threads. void ReplicatingSubscription::dequeued(const Message& m) { QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence()); { Mutex::ScopedLock l(lock); dequeues.add(m.getSequence()); } notify(); // Ensure a call to doDispatch } // Called during construction while scanning for initial dequeues. void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) { QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]"); { Mutex::ScopedLock l(lock); dequeues.add(first,last); } } // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&) { if (pos == backupPosition) return; // No need to send. QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition); string buf(pos.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); pos.encode(buffer); buffer.reset(); { Mutex::ScopedUnlock u(lock); sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); } } void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer) { //generate event message boost::intrusive_ptr event(new qpid::broker::amqp_0_10::MessageTransfer()); AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody())); content.castBody()->decode(buffer, buffer.getSize()); header.setBof(false); header.setEof(false); header.setBos(true); header.setEos(true); content.setBof(false); content.setEof(true); content.setBos(true); content.setEos(true); event->getFrames().append(method); event->getFrames().append(header); event->getFrames().append(content); DeliveryProperties* props = event->getFrames().getHeaders()->get(true); props->setRoutingKey(key); // Send the event directly to the base consumer implementation. //dummy consumer prevents acknowledgements being handled, which is what we want for events ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr()); } // Called in subscription's connection thread. bool ReplicatingSubscription::doDispatch() { { Mutex::ScopedLock l(lock); if (!dequeues.empty()) sendDequeueEvent(l); } try { return ConsumerImpl::doDispatch(); } catch (const std::exception& e) { // FIXME aconway 2012-10-05: detect queue deletion, no warning. QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what()); return false; } } }} // namespace qpid::ha