diff options
author | Stephen D. Huston <shuston@apache.org> | 2010-10-29 16:58:59 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2010-10-29 16:58:59 +0000 |
commit | 93072c51146f9f0b2c647b60f5a724f89b0560fa (patch) | |
tree | 0a7e007126c280cdeef69f2b7c91e44c94d2242a | |
parent | cb763d5178e6c125f89f70942c4709099f5410e9 (diff) | |
download | qpid-python-93072c51146f9f0b2c647b60f5a724f89b0560fa.tar.gz |
Since LSNs are used as persistence IDs and ID 0 usually means "not persisted", ensure that no log record that's used gets written at LSN 0.
Add stub for loadContent().
Correct transaction references in enqueue/dequeue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1028840 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/Log.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/Log.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp | 39 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp | 55 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/Messages.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h | 6 |
9 files changed, 131 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp index 0c1df7fa07..7e669e281e 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp @@ -62,6 +62,9 @@ Log::open(const std::string& path, const TuningParameters& params) ULONG infoSize = sizeof(info); BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize); QPID_WINDOWS_CHECK_NOT(ok, 0); + // If this is the first time this log is opened, give an opportunity to + // initialize its content. + bool needInitialize(false); if (info.TotalContainers == 0) { std::vector<const std::wstring> paths; LPWSTR cPaths[1024]; @@ -82,6 +85,7 @@ Log::open(const std::string& path, const TuningParameters& params) cPaths, NULL); QPID_WINDOWS_CHECK_NOT(ok, 0); + needInitialize = true; } // Need a marshaling area ok = ::CreateLogMarshallingArea(handle, @@ -91,6 +95,8 @@ Log::open(const std::string& path, const TuningParameters& params) 1, // Max read buffers &marshal); QPID_WINDOWS_CHECK_NOT(ok, 0); + if (needInitialize) + initialize(); } uint32_t diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.h b/qpid/cpp/src/qpid/store/ms-clfs/Log.h index e200284856..dda41a3bac 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/Log.h +++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.h @@ -44,6 +44,11 @@ protected: std::string logPath; PVOID marshal; + // Give subclasses a chance to initialize a new log. Called after a new + // log is created, initial set of containers is added, and marshalling + // area is allocated. + virtual void initialize() {} + public: struct TuningParameters { size_t containerSize; diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp index 357c6c1687..2a1cd4d9af 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp @@ -572,10 +572,10 @@ MSSqlClfsProvider::destroy(PersistableQueue& queue) queues.erase(queues.find(qId)); } // Now tell each of the messages they are less one queue commitment. - // Can I call dequeue()? Or some sub-piece of that? Transaction::shared_ptr nonTransactional; BOOST_FOREACH(uint64_t msgId, affectedMessages) { - messages.dequeue(msgId, qId, nonTransactional); + QPID_LOG(debug, "Removing message " << msgId); + messages.dequeue(msgId, qId, nonTransactional); } } @@ -827,20 +827,9 @@ MSSqlClfsProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, uint64_t offset, uint32_t length) { -#if 0 - // SQL store keeps all messages in one table, so we don't need the + // Message log keeps all messages in one log, so we don't need the // queue reference. - DatabaseConnection *db = initConnection(); - MessageRecordset rsMessages; - try { - rsMessages.open(db, TblMessage); - rsMessages.loadContent(msg, data, offset, length); - } - catch(_com_error &e) { - std::string errs = db->getErrors(); - throw ADOException("Error loading message content", e, errs); - } -#endif + messages.loadContent(msg->getPersistenceId(), data, offset, length); } /** @@ -858,9 +847,15 @@ MSSqlClfsProvider::enqueue(qpid::broker::TransactionContext* ctxt, const PersistableQueue& queue) { Transaction::shared_ptr t; - TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt); - if (ctx != 0) + TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt); + if (ctx) t = ctx->getTransaction(); + else { + TPCTransactionContext *tctx; + tctx = dynamic_cast<TPCTransactionContext*>(ctxt); + if (tctx) + t = tctx->getTransaction(); + } uint64_t qId = queue.getPersistenceId(); uint64_t msgId = msg->getPersistenceId(); QueueContents::shared_ptr q; @@ -898,9 +893,15 @@ MSSqlClfsProvider::dequeue(qpid::broker::TransactionContext* ctxt, const PersistableQueue& queue) { Transaction::shared_ptr t; - TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt); - if (ctx != 0) + TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt); + if (ctx) t = ctx->getTransaction(); + else { + TPCTransactionContext *tctx; + tctx = dynamic_cast<TPCTransactionContext*>(ctxt); + if (tctx) + t = tctx->getTransaction(); + } uint64_t qId = queue.getPersistenceId(); uint64_t msgId = msg->getPersistenceId(); QueueContents::shared_ptr q; diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp index 30bb41762c..14d63a4cd4 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp @@ -49,12 +49,18 @@ struct MessageStart { MessageEntryType type; // If the complete message encoding doesn't fit, remainder is in // MessageChunk records to follow. + // headerLength is the size of the message's header in content. It is + // part of the totalLength and the segmentLength. + uint32_t headerLength; uint32_t totalLength; uint32_t segmentLength; char content[MaxMessageContentLength]; MessageStart() - : type(MessageStartEntry), totalLength(0), segmentLength(0) {} + : type(MessageStartEntry), + headerLength(0), + totalLength(0), + segmentLength(0) {} }; // Message-Chunk struct MessageChunk { @@ -95,6 +101,16 @@ namespace qpid { namespace store { namespace ms_clfs { +void +MessageLog::initialize() +{ + // Write something to occupy the first record, preventing a real message + // from being lsn/id 0. Delete of a non-existant id is easily tossed + // during recovery if no other messages have caused the tail to be moved + // up past this dummy record by then. + deleteMessage(0, 0); +} + uint32_t MessageLog::marshallingBufferSize() { @@ -114,8 +130,10 @@ MessageLog::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& ms // Message-Chunk records to contain the rest. If it does all fit in one // record, though, optimize the encoding by going straight to the // Message-Start record rather than encoding then copying to the record. + // In all case MessageStart entry; uint32_t encodedMessageLength = msg->encodedSize(); + entry.headerLength = msg->encodedHeaderSize(); entry.totalLength = encodedMessageLength; CLFS_LSN location, lastChunkLsn; std::auto_ptr<char> encodeStage; @@ -165,6 +183,16 @@ MessageLog::deleteMessage(uint64_t messageId, uint64_t newFirstId) moveTail(idToLsn(newFirstId)); } +// Load part or all of a message's content from previously stored +// log record(s). +void +MessageLog::loadContent(uint64_t messageId, + std::string& data, + uint64_t offset, + uint32_t length) +{ +} + void MessageLog::recordEnqueue (uint64_t messageId, uint64_t queueId, @@ -202,9 +230,11 @@ MessageLog::recover(qpid::broker::RecoveryManager& recoverer, std::map<uint64_t, MessageBlocks> reassemblies; std::map<uint64_t, MessageBlocks>::iterator at; - // Note that there may be message refs in the log which are deleted, so - // be sure to only add msgs at message-start record, and ignore those - // that don't have an existing message record. + QPID_LOG(debug, "Recovering message log"); + + // Note that there may be message refs in the log which are deleted, so + // be sure to only add msgs at message-start record, and ignore those + // that don't have an existing message record. // Get the base LSN - that's how to say "start reading at the beginning" CLFS_INFORMATION info; ULONG infoLength = sizeof (info); @@ -253,11 +283,20 @@ MessageLog::recover(qpid::broker::RecoveryManager& recoverer, // this content off to the side until the remaining record(s) are // located. if (start->totalLength == start->segmentLength) { // Whole thing - qpid::framing::Buffer buff(start->content, start->totalLength); + // Start by recovering the header then see if the rest of + // the content is desired. + qpid::framing::Buffer buff(start->content, start->headerLength); qpid::broker::RecoverableMessage::shared_ptr m = recoverer.recoverMessage(buff); m->setPersistenceId(msgId); messageMap[msgId] = m; + uint32_t contentLength = + start->totalLength - start->headerLength; + if (m->loadContent(contentLength)) { + qpid::framing::Buffer content(&(start->content[start->headerLength]), + contentLength); + m->decodeContent(content); + } } else { // Save it in a block big enough. @@ -310,7 +349,7 @@ MessageLog::recover(qpid::broker::RecoveryManager& recoverer, enqueue = reinterpret_cast<MessageEnqueue *>(recordPointer); msgId = lsnToId(messageLsn); QPID_LOG(debug, "Message " << msgId << " Enqueue on queue " << - enqueue->queueId); + enqueue->queueId << ", txn " << enqueue->transId); if (messageMap.find(msgId) == messageMap.end()) { QPID_LOG(debug, "Message " << msgId << " doesn't exist; discarded"); @@ -357,8 +396,10 @@ MessageLog::recover(qpid::broker::RecoveryManager& recoverer, } DWORD status = ::GetLastError(); ::TerminateReadLog(readContext); - if (status == ERROR_HANDLE_EOF) // No more records + if (status == ERROR_HANDLE_EOF) { // No more records + QPID_LOG(debug, "Message log recovered"); return; + } throw QPID_WINDOWS_ERROR(status); } diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h index 7cf6fa0c4c..b3705287a6 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h +++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h @@ -41,6 +41,11 @@ namespace ms_clfs { */ class MessageLog : public Log { +protected: + // Message log needs to have a no-op first record written in the log + // to ensure that no real message gets an ID 0. + virtual void initialize(); + public: // Inherited and reimplemented from Log. Figure the minimum marshalling // buffer size needed for the records this class writes. @@ -53,6 +58,13 @@ public: // the earliest valid message in the log, so move the tail up to it. void deleteMessage(uint64_t messageId, uint64_t newFirstId); + // Load part or all of a message's content from previously stored + // log record(s). + void loadContent(uint64_t messageId, + std::string& data, + uint64_t offset, + uint32_t length); + // Enqueue and dequeue operations track messages' transit across // queues; each operation may be associated with a transaction. If // the transactionId is 0 the operation is not associated with a diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp index a0782e3e5a..98cc19a7f7 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp @@ -222,6 +222,17 @@ Messages::abort(uint64_t msgId, Transaction::shared_ptr& t) } } +// Load part or all of a message's content from previously stored +// log record(s). +void +Messages::loadContent(uint64_t msgId, + std::string& data, + uint64_t offset, + uint32_t length) +{ + log.loadContent(msgId, data, offset, length); +} + // Recover the current set of messages and where they're queued from // the log. void @@ -247,7 +258,7 @@ Messages::recover(qpid::broker::RecoveryManager& recoverer, for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) { uint64_t msgId = msg->first; const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second; - QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " ops"); + QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)"); MessageInfo::shared_ptr m(new MessageInfo); std::vector<QueueEntry>& entries = messageQueueMap[msgId]; std::vector<MessageLog::RecoveredMsgOp>::const_iterator op; diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.h b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h index 4fa86e3964..f54ccc7c27 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/Messages.h +++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h @@ -52,8 +52,7 @@ class Messages { typedef boost::shared_ptr<MessageInfo> shared_ptr; - MessageInfo() - : enqueuedCount(0) { /*latestLsn.Internal = 0;*/ } + MessageInfo() : enqueuedCount(0) {} }; qpid::sys::RWlock lock; @@ -90,6 +89,13 @@ public: // being removed from all queues, it is deleted. void abort(uint64_t msgId, Transaction::shared_ptr& transaction); + // Load part or all of a message's content from previously stored + // log record(s). + void loadContent(uint64_t msgId, + std::string& data, + uint64_t offset, + uint32_t length); + // Recover the current set of messages and where they're queued from // the log. void recover(qpid::broker::RecoveryManager& recoverer, diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp index 937641405c..04780e83e8 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp @@ -99,6 +99,16 @@ namespace qpid { namespace store { namespace ms_clfs { +void +TransactionLog::initialize() +{ + // Write something to occupy the first record, preventing a real + // transaction from being lsn/id 0. Delete of a non-existant id is easily + // tossed during recovery if no other transactions have caused the tail + // to be moved up past this dummy record by then. + deleteTransaction(0); +} + uint32_t TransactionLog::marshallingBufferSize() { @@ -226,7 +236,6 @@ TransactionLog::deleteTransaction(uint64_t transId) write(&deleteEntry, sizeof(deleteEntry), &transLsn); if (newFirstId != 0) moveTail(idToLsn(newFirstId)); - } void @@ -252,6 +261,8 @@ TransactionLog::collectPreparedXids(std::map<std::string, TPCTransaction::shared void TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap) { + QPID_LOG(debug, "Recovering transaction log"); + // Note that there may be transaction refs in the log which are deleted, // so be sure to only add transactions at Start records, and ignore those // that don't have an existing message record. @@ -378,6 +389,8 @@ TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap) if (status != ERROR_HANDLE_EOF) // No more records throw QPID_WINDOWS_ERROR(status); + QPID_LOG(debug, "Transaction log recovered"); + // At this point we have a list of all the not-deleted transactions that // were in existence when the broker last ran. All transactions of both // Dtx and Tx types that haven't prepared or committed will be aborted. diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h index c128a09ada..7ca27c229e 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h +++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h @@ -58,6 +58,12 @@ class TransactionLog : public Log, std::map<uint64_t, boost::weak_ptr<Transaction> > validIds; qpid::sys::Mutex idsLock; +protected: + // Transaction log needs to have a no-op first record written in the log + // to ensure that no real transaction gets an ID 0; messages think trans + // id 0 means "no transaction." + virtual void initialize(); + public: // Inherited and reimplemented from Log. Figure the minimum marshalling // buffer size needed for the records this class writes. |