diff options
Diffstat (limited to 'qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp | 472 |
1 files changed, 472 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp new file mode 100644 index 0000000000..db5d2ebf4c --- /dev/null +++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp @@ -0,0 +1,472 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/log/Statement.h> + +#include "Messages.h" +#include "Lsn.h" +#include "qpid/store/StoreException.h" +#include <boost/foreach.hpp> + +namespace qpid { +namespace store { +namespace ms_clfs { + +void +Messages::openLog(const std::string& path, const Log::TuningParameters& params) +{ + log.open (path, params); +} + +void +Messages::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) +{ + uint64_t id = log.add(msg); + msg->setPersistenceId(id); + std::auto_ptr<MessageInfo> autom(new MessageInfo); + MessageInfo::shared_ptr m(autom); + std::pair<uint64_t, MessageInfo::shared_ptr> p(id, m); + { + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock); + messages.insert(p); + // If there's only this one message there, move the tail to it. + // This prevents the log from continually growing when messages + // are added and removed one at a time. + if (messages.size() == 1) { + CLFS_LSN newTail = idToLsn(id); + log.moveTail(newTail); + } + } +} + +void +Messages::enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t) +{ + MessageInfo::shared_ptr p; + { + qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock); + MessageMap::const_iterator i = messages.find(msgId); + if (i == messages.end()) + THROW_STORE_EXCEPTION("Message does not exist"); + p = i->second; + } + MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + p->where.push_back(loc); + uint64_t transactionId = 0; + if (t.get() != 0) { + transactionId = t->getId(); + t->enroll(msgId); + } + try { + log.recordEnqueue(msgId, queueId, transactionId); + } + catch (...) { + // Undo the record-keeping if the log wasn't written correctly. + if (transactionId != 0) + t->unenroll(msgId); + p->where.pop_back(); + throw; + } + } +} + +void +Messages::dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t) +{ + MessageInfo::shared_ptr p; + { + qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock); + MessageMap::const_iterator i = messages.find(msgId); + if (i == messages.end()) + THROW_STORE_EXCEPTION("Message does not exist"); + p = i->second; + } + { + // Locate the 'where' entry for the specified queue. Once this operation + // is recorded in the log, update the 'where' entry to reflect it. + // Note that an existing entry in 'where' that refers to a transaction + // is not eligible for this operation. + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i; + for (i = p->where.begin(); i != p->where.end(); ++i) { + if (i->queueId == queueId && i->transaction.get() == 0) + break; + } + if (i == p->where.end()) + THROW_STORE_EXCEPTION("Message not on queue"); + uint64_t transactionId = 0; + if (t.get() != 0) { + transactionId = t->getId(); + t->enroll(msgId); + } + try { + log.recordDequeue(msgId, queueId, transactionId); + } + catch (...) { + // Undo the record-keeping if the log wasn't written correctly. + if (transactionId != 0) + t->unenroll(msgId); + throw; + } + // Ok, logged successfully. If this is a transactional op, note + // the transaction. If non-transactional, remove the 'where' entry. + if (transactionId != 0) { + i->transaction = t; + i->disposition = MessageInfo::TRANSACTION_DEQUEUE; + } + else { + p->where.erase(i); + // If the message doesn't exist on any other queues, remove it. + if (p->where.empty()) + remove(msgId); + } + } +} + +// Commit a previous provisional enqueue or dequeue of a particular message +// actions under a specified transaction. If this results in the message's +// being removed from all queues, it is deleted. +void +Messages::commit(uint64_t msgId, Transaction::shared_ptr& t) +{ + MessageInfo::shared_ptr p; + { + qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock); + MessageMap::const_iterator i = messages.find(msgId); + if (i == messages.end()) + THROW_STORE_EXCEPTION("Message does not exist"); + p = i->second; + } + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i; + for (i = p->where.begin(); i != p->where.end(); ++i) { + if (i->transaction != t) + continue; + // Transactional dequeues can now remove the item from the + // where list; enqueues just clear the transaction reference. + if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE) + i = p->where.erase(i); + else + i->transaction.reset(); + } + } + // If committing results in this message having no further enqueue + // references, delete it. If the delete fails, swallow the exception + // and let recovery take care of removing it later. + if (p->where.empty()) { + try { + remove(msgId); + } + catch(...) {} + } +} + +// Abort a previous provisional enqueue or dequeue of a particular message +// actions under a specified transaction. If this results in the message's +// being removed from all queues, it is deleted. +void +Messages::abort(uint64_t msgId, Transaction::shared_ptr& t) +{ + MessageInfo::shared_ptr p; + { + qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock); + MessageMap::const_iterator i = messages.find(msgId); + if (i == messages.end()) + THROW_STORE_EXCEPTION("Message does not exist"); + p = i->second; + } + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock); + std::list<MessageInfo::Location>::iterator i = p->where.begin(); + while (i != p->where.end()) { + if (i->transaction != t) { + ++i; + continue; + } + // Aborted transactional dequeues result in the message remaining + // enqueued like before the operation; enqueues clear the + // message from the where list - like the enqueue never happened. + if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE) + i = p->where.erase(i); + else { + i->transaction.reset(); + ++i; + } + } + } + // If aborting results in this message having no further enqueue + // references, delete it. If the delete fails, swallow the exception + // and let recovery take care of removing it later. + if (p->where.empty()) { + try { + remove(msgId); + } + catch(...) {} + } +} + +// Load part or all of a message's content from previously stored +// log record(s). +void +Messages::loadContent(uint64_t msgId, + std::string& data, + uint64_t offset, + uint32_t length) +{ + log.loadContent(msgId, data, offset, length); +} + +// Recover the current set of messages and where they're queued from +// the log. +void +Messages::recover(qpid::broker::RecoveryManager& recoverer, + const std::set<uint64_t> &validQueues, + const std::map<uint64_t, Transaction::shared_ptr>& transMap, + qpid::store::MessageMap& messageMap, + qpid::store::MessageQueueMap& messageQueueMap) +{ + std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps; + log.recover(recoverer, messageMap, messageOps); + // Now read through the messageOps replaying the operations with the + // knowledge of which transactions committed, aborted, etc. A transaction + // should not be deleted until there are no messages referencing it so + // a message operation with a transaction id not found in transMap is + // a serious problem. + QPID_LOG(debug, "Beginning CLFS-recovered message operation replay"); + // Keep track of any messages that are recovered from the log but don't + // have any place to be. This can happen, for example, if the broker + // crashes while logging a message deletion. After all the recovery is + // done, delete all the homeless messages. + std::vector<uint64_t> homeless; + std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> >::const_iterator msg; + for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) { + uint64_t msgId = msg->first; + const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second; + QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)"); + MessageInfo::shared_ptr m(new MessageInfo); + std::vector<QueueEntry>& entries = messageQueueMap[msgId]; + std::vector<MessageLog::RecoveredMsgOp>::const_iterator op; + for (op = ops.begin(); op != ops.end(); ++op) { + QueueEntry entry(op->queueId); + MessageInfo::Location loc(op->queueId); + std::string dir = + op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue" + : "dequeue"; + if (validQueues.find(op->queueId) == validQueues.end()) { + QPID_LOG(info, + "Message " << msgId << dir << " on non-existant queue " + << op->queueId << "; dropped"); + continue; + } + if (op->txnId != 0) { + // Be sure to enroll this message in the transaction even if + // it has committed or aborted. This ensures that the + // transaction isn't removed from the log while finalizing the + // recovery. If it were to be removed and the broker failed + // again before removing this message during normal operation, + // it couldn't be recovered again. + // + // Recall what is being reconstructed; 2 things: + // 1. This class's 'messages' list which keeps track + // of the queues each message is on and the transactions + // each message is enrolled in. For this, aborted + // transactions cause the result of the operation to be + // ignored, but the message does need to be enrolled in + // the transaction to properly maintain the transaction + // references until the message is deleted. + // 2. The StorageProvider's MessageQueueMap, which also + // has an entry for each queue each message is on and + // its TPL status and associated xid. + const Transaction::shared_ptr &t = + transMap.find(op->txnId)->second; + // Prepared transactions cause the operation to be + // provisionally acted on, and the message to be enrolled in + // the transaction for when it commits/aborts. This is + // noted in the QueueEntry for the StorageProvider's map. + if (t->getState() == Transaction::TRANS_PREPARED) { + QPID_LOG(debug, dir << " for queue " << op->queueId << + ", prepared txn " << op->txnId); + TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t)); + if (tpct.get() == 0) + THROW_STORE_EXCEPTION("Invalid transaction state"); + t->enroll(msgId); + entry.xid = tpct->getXid(); + loc.transaction = t; + if (op->op == MessageLog::RECOVERED_ENQUEUE) { + entry.tplStatus = QueueEntry::ADDING; + loc.disposition = MessageInfo::TRANSACTION_ENQUEUE; + } + else { + entry.tplStatus = QueueEntry::REMOVING; + loc.disposition = MessageInfo::TRANSACTION_DEQUEUE; + } + } + else if (t->getState() != Transaction::TRANS_COMMITTED) { + QPID_LOG(debug, dir << " for queue " << op->queueId << + ", txn " << op->txnId << ", rolling back"); + continue; + } + } + // Here for non-transactional and prepared transactional operations + // to set up the messageQueueMap entries. Note that at this point + // a committed transactional operation looks like a + // non-transactional one as far as the QueueEntry is + // concerned - just do it. If this is an entry enqueuing a + // message, just add it to the entries list. If it's a dequeue + // operation, locate the matching entry for the queue and delete + // it if the current op is non-transactional; if it's a prepared + // transaction then replace the existing entry with the current + // one that notes the message is enqueued but being removed under + // a prepared transaction. + QPID_LOG(debug, dir + " at queue " << entry.queueId); + if (op->op == MessageLog::RECOVERED_ENQUEUE) { + entries.push_back(entry); + m->where.push_back(loc); + } + else { + std::vector<QueueEntry>::iterator i = entries.begin(); + while (i != entries.end()) { + if (i->queueId == entry.queueId) { + if (entry.tplStatus != QueueEntry::NONE) + *i = entry; + else + entries.erase(i); + break; + } + ++i; + } + std::list<MessageInfo::Location>::iterator w = m->where.begin(); + while (w != m->where.end()) { + if (w->queueId == loc.queueId) { + if (loc.transaction.get() != 0) { + *w = loc; + ++w; + } + else { + w = m->where.erase(w); + } + } + } + } + } + // Now that all the queue entries have been set correctly, see if + // there are any entries; they may have all been removed during + // recovery. If there are none, add this message to the homeless + // list to be deleted from the log after the recovery is done. + if (m->where.size() == 0) { + homeless.push_back(msgId); + messageMap.erase(msgId); + messageQueueMap.erase(msgId); + } + else { + std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m); + messages.insert(p); + } + } + + QPID_LOG(debug, "Message log recovery done."); + // Done! Ok, go back and delete all the homeless messages. + BOOST_FOREACH(uint64_t msg, homeless) { + QPID_LOG(debug, "Deleting homeless message " << msg); + remove(msg); + } +} + +// Expunge is called when a queue is deleted. All references to that +// queue must be expunged from all messages. 'Dequeue' log records are +// written for each queue entry removed, but any errors are swallowed. +// On recovery there's a list of valid queues passed in. The deleted +// queue will not be on that list so if any references to it are +// recovered they'll get weeded out then. +void +Messages::expunge(uint64_t queueId) +{ + std::vector<uint64_t> toBeDeleted; // Messages to be deleted later. + + { + // Lock everybody out since all messages are possibly in play. + // There also may be other threads already working on particular + // messages so individual message mutex still must be acquired. + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock); + MessageMap::iterator m; + for (m = messages.begin(); m != messages.end(); ++m) { + MessageInfo::shared_ptr p = m->second; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> ml(p->whereLock); + std::list<MessageInfo::Location>::iterator i = p->where.begin(); + while (i != p->where.end()) { + if (i->queueId != queueId) { + ++i; + continue; + } + // If this entry is involved in a transaction, unenroll it. + // Then remove the entry. + if (i->transaction.get() != 0) + i->transaction->unenroll(m->first); + i = p->where.erase(i); + try { + log.recordDequeue(m->first, queueId, 0); + } + catch(...) { + } + } + if (p->where.size() == 0) + toBeDeleted.push_back(m->first); + } + } + } + // Swallow any exceptions during this; don't care. Recover it later + // if needed. + try { + BOOST_FOREACH(uint64_t msg, toBeDeleted) + remove(msg); + } + catch(...) { + } +} + +// Remove a specified message from those controlled by this object. +void +Messages::remove(uint64_t messageId) +{ + uint64_t newFirstId = 0; + { + qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock); + messages.erase(messageId); + // May have deleted the first entry; if so the log can release that. + // If this message being deleted results in an empty list of + // messages, move the tail up to this message's LSN. This may + // result in one or more messages being stranded in the log + // until there's more activity. If a restart happens while these + // unneeded log records are there, the presence of the MessageDelete + // entry will cause the message(s) to be ignored anyway. + if (messages.empty()) + newFirstId = messageId; + else if (messages.begin()->first > messageId) + newFirstId = messages.begin()->first; + } + log.deleteMessage(messageId, newFirstId); +} + +}}} // namespace qpid::store::ms_clfs |