summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2010-10-29 16:58:59 +0000
committerStephen D. Huston <shuston@apache.org>2010-10-29 16:58:59 +0000
commit93072c51146f9f0b2c647b60f5a724f89b0560fa (patch)
tree0a7e007126c280cdeef69f2b7c91e44c94d2242a
parentcb763d5178e6c125f89f70942c4709099f5410e9 (diff)
downloadqpid-python-93072c51146f9f0b2c647b60f5a724f89b0560fa.tar.gz
Since LSNs are used as persistence IDs and ID 0 usually means "not persisted", ensure that no log record that's used gets written at LSN 0.
Add stub for loadContent(). Correct transaction references in enqueue/dequeue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1028840 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Log.cpp6
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Log.h5
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp39
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp55
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h12
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp13
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Messages.h10
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp15
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h6
9 files changed, 131 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
index 0c1df7fa07..7e669e281e 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
@@ -62,6 +62,9 @@ Log::open(const std::string& path, const TuningParameters& params)
ULONG infoSize = sizeof(info);
BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
QPID_WINDOWS_CHECK_NOT(ok, 0);
+ // If this is the first time this log is opened, give an opportunity to
+ // initialize its content.
+ bool needInitialize(false);
if (info.TotalContainers == 0) {
std::vector<const std::wstring> paths;
LPWSTR cPaths[1024];
@@ -82,6 +85,7 @@ Log::open(const std::string& path, const TuningParameters& params)
cPaths,
NULL);
QPID_WINDOWS_CHECK_NOT(ok, 0);
+ needInitialize = true;
}
// Need a marshaling area
ok = ::CreateLogMarshallingArea(handle,
@@ -91,6 +95,8 @@ Log::open(const std::string& path, const TuningParameters& params)
1, // Max read buffers
&marshal);
QPID_WINDOWS_CHECK_NOT(ok, 0);
+ if (needInitialize)
+ initialize();
}
uint32_t
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.h b/qpid/cpp/src/qpid/store/ms-clfs/Log.h
index e200284856..dda41a3bac 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/Log.h
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.h
@@ -44,6 +44,11 @@ protected:
std::string logPath;
PVOID marshal;
+ // Give subclasses a chance to initialize a new log. Called after a new
+ // log is created, initial set of containers is added, and marshalling
+ // area is allocated.
+ virtual void initialize() {}
+
public:
struct TuningParameters {
size_t containerSize;
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
index 357c6c1687..2a1cd4d9af 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
@@ -572,10 +572,10 @@ MSSqlClfsProvider::destroy(PersistableQueue& queue)
queues.erase(queues.find(qId));
}
// Now tell each of the messages they are less one queue commitment.
- // Can I call dequeue()? Or some sub-piece of that?
Transaction::shared_ptr nonTransactional;
BOOST_FOREACH(uint64_t msgId, affectedMessages) {
- messages.dequeue(msgId, qId, nonTransactional);
+ QPID_LOG(debug, "Removing message " << msgId);
+ messages.dequeue(msgId, qId, nonTransactional);
}
}
@@ -827,20 +827,9 @@ MSSqlClfsProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
uint64_t offset,
uint32_t length)
{
-#if 0
- // SQL store keeps all messages in one table, so we don't need the
+ // Message log keeps all messages in one log, so we don't need the
// queue reference.
- DatabaseConnection *db = initConnection();
- MessageRecordset rsMessages;
- try {
- rsMessages.open(db, TblMessage);
- rsMessages.loadContent(msg, data, offset, length);
- }
- catch(_com_error &e) {
- std::string errs = db->getErrors();
- throw ADOException("Error loading message content", e, errs);
- }
-#endif
+ messages.loadContent(msg->getPersistenceId(), data, offset, length);
}
/**
@@ -858,9 +847,15 @@ MSSqlClfsProvider::enqueue(qpid::broker::TransactionContext* ctxt,
const PersistableQueue& queue)
{
Transaction::shared_ptr t;
- TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt);
- if (ctx != 0)
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt);
+ if (ctx)
t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(ctxt);
+ if (tctx)
+ t = tctx->getTransaction();
+ }
uint64_t qId = queue.getPersistenceId();
uint64_t msgId = msg->getPersistenceId();
QueueContents::shared_ptr q;
@@ -898,9 +893,15 @@ MSSqlClfsProvider::dequeue(qpid::broker::TransactionContext* ctxt,
const PersistableQueue& queue)
{
Transaction::shared_ptr t;
- TransactionContext *ctx = dynamic_cast<TransactionContext*> (ctxt);
- if (ctx != 0)
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt);
+ if (ctx)
t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(ctxt);
+ if (tctx)
+ t = tctx->getTransaction();
+ }
uint64_t qId = queue.getPersistenceId();
uint64_t msgId = msg->getPersistenceId();
QueueContents::shared_ptr q;
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
index 30bb41762c..14d63a4cd4 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
@@ -49,12 +49,18 @@ struct MessageStart {
MessageEntryType type;
// If the complete message encoding doesn't fit, remainder is in
// MessageChunk records to follow.
+ // headerLength is the size of the message's header in content. It is
+ // part of the totalLength and the segmentLength.
+ uint32_t headerLength;
uint32_t totalLength;
uint32_t segmentLength;
char content[MaxMessageContentLength];
MessageStart()
- : type(MessageStartEntry), totalLength(0), segmentLength(0) {}
+ : type(MessageStartEntry),
+ headerLength(0),
+ totalLength(0),
+ segmentLength(0) {}
};
// Message-Chunk
struct MessageChunk {
@@ -95,6 +101,16 @@ namespace qpid {
namespace store {
namespace ms_clfs {
+void
+MessageLog::initialize()
+{
+ // Write something to occupy the first record, preventing a real message
+ // from being lsn/id 0. Delete of a non-existant id is easily tossed
+ // during recovery if no other messages have caused the tail to be moved
+ // up past this dummy record by then.
+ deleteMessage(0, 0);
+}
+
uint32_t
MessageLog::marshallingBufferSize()
{
@@ -114,8 +130,10 @@ MessageLog::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& ms
// Message-Chunk records to contain the rest. If it does all fit in one
// record, though, optimize the encoding by going straight to the
// Message-Start record rather than encoding then copying to the record.
+ // In all case
MessageStart entry;
uint32_t encodedMessageLength = msg->encodedSize();
+ entry.headerLength = msg->encodedHeaderSize();
entry.totalLength = encodedMessageLength;
CLFS_LSN location, lastChunkLsn;
std::auto_ptr<char> encodeStage;
@@ -165,6 +183,16 @@ MessageLog::deleteMessage(uint64_t messageId, uint64_t newFirstId)
moveTail(idToLsn(newFirstId));
}
+// Load part or all of a message's content from previously stored
+// log record(s).
+void
+MessageLog::loadContent(uint64_t messageId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+}
+
void
MessageLog::recordEnqueue (uint64_t messageId,
uint64_t queueId,
@@ -202,9 +230,11 @@ MessageLog::recover(qpid::broker::RecoveryManager& recoverer,
std::map<uint64_t, MessageBlocks> reassemblies;
std::map<uint64_t, MessageBlocks>::iterator at;
- // Note that there may be message refs in the log which are deleted, so
- // be sure to only add msgs at message-start record, and ignore those
- // that don't have an existing message record.
+ QPID_LOG(debug, "Recovering message log");
+
+ // Note that there may be message refs in the log which are deleted, so
+ // be sure to only add msgs at message-start record, and ignore those
+ // that don't have an existing message record.
// Get the base LSN - that's how to say "start reading at the beginning"
CLFS_INFORMATION info;
ULONG infoLength = sizeof (info);
@@ -253,11 +283,20 @@ MessageLog::recover(qpid::broker::RecoveryManager& recoverer,
// this content off to the side until the remaining record(s) are
// located.
if (start->totalLength == start->segmentLength) { // Whole thing
- qpid::framing::Buffer buff(start->content, start->totalLength);
+ // Start by recovering the header then see if the rest of
+ // the content is desired.
+ qpid::framing::Buffer buff(start->content, start->headerLength);
qpid::broker::RecoverableMessage::shared_ptr m =
recoverer.recoverMessage(buff);
m->setPersistenceId(msgId);
messageMap[msgId] = m;
+ uint32_t contentLength =
+ start->totalLength - start->headerLength;
+ if (m->loadContent(contentLength)) {
+ qpid::framing::Buffer content(&(start->content[start->headerLength]),
+ contentLength);
+ m->decodeContent(content);
+ }
}
else {
// Save it in a block big enough.
@@ -310,7 +349,7 @@ MessageLog::recover(qpid::broker::RecoveryManager& recoverer,
enqueue = reinterpret_cast<MessageEnqueue *>(recordPointer);
msgId = lsnToId(messageLsn);
QPID_LOG(debug, "Message " << msgId << " Enqueue on queue " <<
- enqueue->queueId);
+ enqueue->queueId << ", txn " << enqueue->transId);
if (messageMap.find(msgId) == messageMap.end()) {
QPID_LOG(debug,
"Message " << msgId << " doesn't exist; discarded");
@@ -357,8 +396,10 @@ MessageLog::recover(qpid::broker::RecoveryManager& recoverer,
}
DWORD status = ::GetLastError();
::TerminateReadLog(readContext);
- if (status == ERROR_HANDLE_EOF) // No more records
+ if (status == ERROR_HANDLE_EOF) { // No more records
+ QPID_LOG(debug, "Message log recovered");
return;
+ }
throw QPID_WINDOWS_ERROR(status);
}
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
index 7cf6fa0c4c..b3705287a6 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
@@ -41,6 +41,11 @@ namespace ms_clfs {
*/
class MessageLog : public Log {
+protected:
+ // Message log needs to have a no-op first record written in the log
+ // to ensure that no real message gets an ID 0.
+ virtual void initialize();
+
public:
// Inherited and reimplemented from Log. Figure the minimum marshalling
// buffer size needed for the records this class writes.
@@ -53,6 +58,13 @@ public:
// the earliest valid message in the log, so move the tail up to it.
void deleteMessage(uint64_t messageId, uint64_t newFirstId);
+ // Load part or all of a message's content from previously stored
+ // log record(s).
+ void loadContent(uint64_t messageId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
// Enqueue and dequeue operations track messages' transit across
// queues; each operation may be associated with a transaction. If
// the transactionId is 0 the operation is not associated with a
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
index a0782e3e5a..98cc19a7f7 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
@@ -222,6 +222,17 @@ Messages::abort(uint64_t msgId, Transaction::shared_ptr& t)
}
}
+// Load part or all of a message's content from previously stored
+// log record(s).
+void
+Messages::loadContent(uint64_t msgId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ log.loadContent(msgId, data, offset, length);
+}
+
// Recover the current set of messages and where they're queued from
// the log.
void
@@ -247,7 +258,7 @@ Messages::recover(qpid::broker::RecoveryManager& recoverer,
for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) {
uint64_t msgId = msg->first;
const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second;
- QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " ops");
+ QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)");
MessageInfo::shared_ptr m(new MessageInfo);
std::vector<QueueEntry>& entries = messageQueueMap[msgId];
std::vector<MessageLog::RecoveredMsgOp>::const_iterator op;
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.h b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
index 4fa86e3964..f54ccc7c27 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
@@ -52,8 +52,7 @@ class Messages {
typedef boost::shared_ptr<MessageInfo> shared_ptr;
- MessageInfo()
- : enqueuedCount(0) { /*latestLsn.Internal = 0;*/ }
+ MessageInfo() : enqueuedCount(0) {}
};
qpid::sys::RWlock lock;
@@ -90,6 +89,13 @@ public:
// being removed from all queues, it is deleted.
void abort(uint64_t msgId, Transaction::shared_ptr& transaction);
+ // Load part or all of a message's content from previously stored
+ // log record(s).
+ void loadContent(uint64_t msgId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
// Recover the current set of messages and where they're queued from
// the log.
void recover(qpid::broker::RecoveryManager& recoverer,
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
index 937641405c..04780e83e8 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
@@ -99,6 +99,16 @@ namespace qpid {
namespace store {
namespace ms_clfs {
+void
+TransactionLog::initialize()
+{
+ // Write something to occupy the first record, preventing a real
+ // transaction from being lsn/id 0. Delete of a non-existant id is easily
+ // tossed during recovery if no other transactions have caused the tail
+ // to be moved up past this dummy record by then.
+ deleteTransaction(0);
+}
+
uint32_t
TransactionLog::marshallingBufferSize()
{
@@ -226,7 +236,6 @@ TransactionLog::deleteTransaction(uint64_t transId)
write(&deleteEntry, sizeof(deleteEntry), &transLsn);
if (newFirstId != 0)
moveTail(idToLsn(newFirstId));
-
}
void
@@ -252,6 +261,8 @@ TransactionLog::collectPreparedXids(std::map<std::string, TPCTransaction::shared
void
TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap)
{
+ QPID_LOG(debug, "Recovering transaction log");
+
// Note that there may be transaction refs in the log which are deleted,
// so be sure to only add transactions at Start records, and ignore those
// that don't have an existing message record.
@@ -378,6 +389,8 @@ TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap)
if (status != ERROR_HANDLE_EOF) // No more records
throw QPID_WINDOWS_ERROR(status);
+ QPID_LOG(debug, "Transaction log recovered");
+
// At this point we have a list of all the not-deleted transactions that
// were in existence when the broker last ran. All transactions of both
// Dtx and Tx types that haven't prepared or committed will be aborted.
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
index c128a09ada..7ca27c229e 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
@@ -58,6 +58,12 @@ class TransactionLog : public Log,
std::map<uint64_t, boost::weak_ptr<Transaction> > validIds;
qpid::sys::Mutex idsLock;
+protected:
+ // Transaction log needs to have a no-op first record written in the log
+ // to ensure that no real transaction gets an ID 0; messages think trans
+ // id 0 means "no transaction."
+ virtual void initialize();
+
public:
// Inherited and reimplemented from Log. Figure the minimum marshalling
// buffer size needed for the records this class writes.