/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include "Messages.h" #include "Lsn.h" #include "qpid/store/StoreException.h" #include namespace qpid { namespace store { namespace ms_clfs { void Messages::openLog(const std::string& path, const Log::TuningParameters& params) { log.open (path, params); } void Messages::add(const boost::intrusive_ptr& msg) { uint64_t id = log.add(msg); msg->setPersistenceId(id); std::auto_ptr autom(new MessageInfo); MessageInfo::shared_ptr m(autom); std::pair p(id, m); { qpid::sys::ScopedWlock l(lock); messages.insert(p); // If there's only this one message there, move the tail to it. // This prevents the log from continually growing when messages // are added and removed one at a time. if (messages.size() == 1) { CLFS_LSN newTail = idToLsn(id); log.moveTail(newTail); } } } void Messages::enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t) { MessageInfo::shared_ptr p; { qpid::sys::ScopedRlock l(lock); MessageMap::const_iterator i = messages.find(msgId); if (i == messages.end()) THROW_STORE_EXCEPTION("Message does not exist"); p = i->second; } MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE); { qpid::sys::ScopedLock l(p->whereLock); p->where.push_back(loc); uint64_t transactionId = 0; if (t.get() != 0) { transactionId = t->getId(); t->enroll(msgId); } try { log.recordEnqueue(msgId, queueId, transactionId); } catch (...) { // Undo the record-keeping if the log wasn't written correctly. if (transactionId != 0) t->unenroll(msgId); p->where.pop_back(); throw; } } } void Messages::dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t) { MessageInfo::shared_ptr p; { qpid::sys::ScopedRlock l(lock); MessageMap::const_iterator i = messages.find(msgId); if (i == messages.end()) THROW_STORE_EXCEPTION("Message does not exist"); p = i->second; } { // Locate the 'where' entry for the specified queue. Once this operation // is recorded in the log, update the 'where' entry to reflect it. // Note that an existing entry in 'where' that refers to a transaction // is not eligible for this operation. qpid::sys::ScopedLock l(p->whereLock); std::list::iterator i; for (i = p->where.begin(); i != p->where.end(); ++i) { if (i->queueId == queueId && i->transaction.get() == 0) break; } if (i == p->where.end()) THROW_STORE_EXCEPTION("Message not on queue"); uint64_t transactionId = 0; if (t.get() != 0) { transactionId = t->getId(); t->enroll(msgId); } try { log.recordDequeue(msgId, queueId, transactionId); } catch (...) { // Undo the record-keeping if the log wasn't written correctly. if (transactionId != 0) t->unenroll(msgId); throw; } // Ok, logged successfully. If this is a transactional op, note // the transaction. If non-transactional, remove the 'where' entry. if (transactionId != 0) { i->transaction = t; i->disposition = MessageInfo::TRANSACTION_DEQUEUE; } else { p->where.erase(i); // If the message doesn't exist on any other queues, remove it. if (p->where.empty()) remove(msgId); } } } // Commit a previous provisional enqueue or dequeue of a particular message // actions under a specified transaction. If this results in the message's // being removed from all queues, it is deleted. void Messages::commit(uint64_t msgId, Transaction::shared_ptr& t) { MessageInfo::shared_ptr p; { qpid::sys::ScopedRlock l(lock); MessageMap::const_iterator i = messages.find(msgId); if (i == messages.end()) THROW_STORE_EXCEPTION("Message does not exist"); p = i->second; } { qpid::sys::ScopedLock l(p->whereLock); std::list::iterator i; for (i = p->where.begin(); i != p->where.end(); ++i) { if (i->transaction != t) continue; // Transactional dequeues can now remove the item from the // where list; enqueues just clear the transaction reference. if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE) i = p->where.erase(i); else i->transaction.reset(); } } // If committing results in this message having no further enqueue // references, delete it. If the delete fails, swallow the exception // and let recovery take care of removing it later. if (p->where.empty()) { try { remove(msgId); } catch(...) {} } } // Abort a previous provisional enqueue or dequeue of a particular message // actions under a specified transaction. If this results in the message's // being removed from all queues, it is deleted. void Messages::abort(uint64_t msgId, Transaction::shared_ptr& t) { MessageInfo::shared_ptr p; { qpid::sys::ScopedRlock l(lock); MessageMap::const_iterator i = messages.find(msgId); if (i == messages.end()) THROW_STORE_EXCEPTION("Message does not exist"); p = i->second; } { qpid::sys::ScopedLock l(p->whereLock); std::list::iterator i = p->where.begin(); while (i != p->where.end()) { if (i->transaction != t) { ++i; continue; } // Aborted transactional dequeues result in the message remaining // enqueued like before the operation; enqueues clear the // message from the where list - like the enqueue never happened. if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE) i = p->where.erase(i); else { i->transaction.reset(); ++i; } } } // If aborting results in this message having no further enqueue // references, delete it. If the delete fails, swallow the exception // and let recovery take care of removing it later. if (p->where.empty()) { try { remove(msgId); } catch(...) {} } } // Load part or all of a message's content from previously stored // log record(s). void Messages::loadContent(uint64_t msgId, std::string& data, uint64_t offset, uint32_t length) { log.loadContent(msgId, data, offset, length); } // Recover the current set of messages and where they're queued from // the log. void Messages::recover(qpid::broker::RecoveryManager& recoverer, const std::set &validQueues, const std::map& transMap, qpid::store::MessageMap& messageMap, qpid::store::MessageQueueMap& messageQueueMap) { std::map > messageOps; log.recover(recoverer, messageMap, messageOps); // Now read through the messageOps replaying the operations with the // knowledge of which transactions committed, aborted, etc. A transaction // should not be deleted until there are no messages referencing it so // a message operation with a transaction id not found in transMap is // a serious problem. QPID_LOG(debug, "Beginning CLFS-recovered message operation replay"); // Keep track of any messages that are recovered from the log but don't // have any place to be. This can happen, for example, if the broker // crashes while logging a message deletion. After all the recovery is // done, delete all the homeless messages. std::vector homeless; std::map >::const_iterator msg; for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) { uint64_t msgId = msg->first; const std::vector& ops = msg->second; QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)"); MessageInfo::shared_ptr m(new MessageInfo); std::vector& entries = messageQueueMap[msgId]; std::vector::const_iterator op; for (op = ops.begin(); op != ops.end(); ++op) { QueueEntry entry(op->queueId); MessageInfo::Location loc(op->queueId); std::string dir = op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue" : "dequeue"; if (validQueues.find(op->queueId) == validQueues.end()) { QPID_LOG(info, "Message " << msgId << dir << " on non-existant queue " << op->queueId << "; dropped"); continue; } if (op->txnId != 0) { // Be sure to enroll this message in the transaction even if // it has committed or aborted. This ensures that the // transaction isn't removed from the log while finalizing the // recovery. If it were to be removed and the broker failed // again before removing this message during normal operation, // it couldn't be recovered again. // // Recall what is being reconstructed; 2 things: // 1. This class's 'messages' list which keeps track // of the queues each message is on and the transactions // each message is enrolled in. For this, aborted // transactions cause the result of the operation to be // ignored, but the message does need to be enrolled in // the transaction to properly maintain the transaction // references until the message is deleted. // 2. The StorageProvider's MessageQueueMap, which also // has an entry for each queue each message is on and // its TPL status and associated xid. const Transaction::shared_ptr &t = transMap.find(op->txnId)->second; // Prepared transactions cause the operation to be // provisionally acted on, and the message to be enrolled in // the transaction for when it commits/aborts. This is // noted in the QueueEntry for the StorageProvider's map. if (t->getState() == Transaction::TRANS_PREPARED) { QPID_LOG(debug, dir << " for queue " << op->queueId << ", prepared txn " << op->txnId); TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast(t)); if (tpct.get() == 0) THROW_STORE_EXCEPTION("Invalid transaction state"); t->enroll(msgId); entry.xid = tpct->getXid(); loc.transaction = t; if (op->op == MessageLog::RECOVERED_ENQUEUE) { entry.tplStatus = QueueEntry::ADDING; loc.disposition = MessageInfo::TRANSACTION_ENQUEUE; } else { entry.tplStatus = QueueEntry::REMOVING; loc.disposition = MessageInfo::TRANSACTION_DEQUEUE; } } else if (t->getState() != Transaction::TRANS_COMMITTED) { QPID_LOG(debug, dir << " for queue " << op->queueId << ", txn " << op->txnId << ", rolling back"); continue; } } // Here for non-transactional and prepared transactional operations // to set up the messageQueueMap entries. Note that at this point // a committed transactional operation looks like a // non-transactional one as far as the QueueEntry is // concerned - just do it. If this is an entry enqueuing a // message, just add it to the entries list. If it's a dequeue // operation, locate the matching entry for the queue and delete // it if the current op is non-transactional; if it's a prepared // transaction then replace the existing entry with the current // one that notes the message is enqueued but being removed under // a prepared transaction. QPID_LOG(debug, dir + " at queue " << entry.queueId); if (op->op == MessageLog::RECOVERED_ENQUEUE) { entries.push_back(entry); m->where.push_back(loc); } else { std::vector::iterator i = entries.begin(); while (i != entries.end()) { if (i->queueId == entry.queueId) { if (entry.tplStatus != QueueEntry::NONE) *i = entry; else entries.erase(i); break; } ++i; } std::list::iterator w = m->where.begin(); while (w != m->where.end()) { if (w->queueId == loc.queueId) { if (loc.transaction.get() != 0) { *w = loc; ++w; } else { w = m->where.erase(w); } } } } } // Now that all the queue entries have been set correctly, see if // there are any entries; they may have all been removed during // recovery. If there are none, add this message to the homeless // list to be deleted from the log after the recovery is done. if (m->where.size() == 0) { homeless.push_back(msgId); messageMap.erase(msgId); messageQueueMap.erase(msgId); } else { std::pair p(msgId, m); messages.insert(p); } } QPID_LOG(debug, "Message log recovery done."); // Done! Ok, go back and delete all the homeless messages. BOOST_FOREACH(uint64_t msg, homeless) { QPID_LOG(debug, "Deleting homeless message " << msg); remove(msg); } } // Expunge is called when a queue is deleted. All references to that // queue must be expunged from all messages. 'Dequeue' log records are // written for each queue entry removed, but any errors are swallowed. // On recovery there's a list of valid queues passed in. The deleted // queue will not be on that list so if any references to it are // recovered they'll get weeded out then. void Messages::expunge(uint64_t queueId) { std::vector toBeDeleted; // Messages to be deleted later. { // Lock everybody out since all messages are possibly in play. // There also may be other threads already working on particular // messages so individual message mutex still must be acquired. qpid::sys::ScopedWlock l(lock); MessageMap::iterator m; for (m = messages.begin(); m != messages.end(); ++m) { MessageInfo::shared_ptr p = m->second; { qpid::sys::ScopedLock ml(p->whereLock); std::list::iterator i = p->where.begin(); while (i != p->where.end()) { if (i->queueId != queueId) { ++i; continue; } // If this entry is involved in a transaction, unenroll it. // Then remove the entry. if (i->transaction.get() != 0) i->transaction->unenroll(m->first); i = p->where.erase(i); try { log.recordDequeue(m->first, queueId, 0); } catch(...) { } } if (p->where.size() == 0) toBeDeleted.push_back(m->first); } } } // Swallow any exceptions during this; don't care. Recover it later // if needed. try { BOOST_FOREACH(uint64_t msg, toBeDeleted) remove(msg); } catch(...) { } } // Remove a specified message from those controlled by this object. void Messages::remove(uint64_t messageId) { uint64_t newFirstId = 0; { qpid::sys::ScopedWlock l(lock); messages.erase(messageId); // May have deleted the first entry; if so the log can release that. // If this message being deleted results in an empty list of // messages, move the tail up to this message's LSN. This may // result in one or more messages being stranded in the log // until there's more activity. If a restart happens while these // unneeded log records are there, the presence of the MessageDelete // entry will cause the message(s) to be ignored anyway. if (messages.empty()) newFirstId = messageId; else if (messages.begin()->first > messageId) newFirstId = messages.begin()->first; } log.deleteMessage(messageId, newFirstId); } }}} // namespace qpid::store::ms_clfs