diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-11-05 23:13:25 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-11-05 23:13:25 +0000 |
commit | 8287ba11b7b8e0156ea0268b3d0fe649b955c67c (patch) | |
tree | 9deb5a5484e04b9d52e0afbee58db289e6015251 /cpp/src | |
parent | deb38bdd13c1cb4255eda867af66b763d8ded3be (diff) | |
download | qpid-python-8287ba11b7b8e0156ea0268b3d0fe649b955c67c.tar.gz |
Fix restoration of durable messages at startup. Remove all references to a queue that's about to be deleted from the database. Fixes QPID-2183.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@833230 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/store/ms-sql/BlobRecordset.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/BlobRecordset.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageMapRecordset.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageRecordset.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MessageRecordset.h | 2 |
7 files changed, 105 insertions, 36 deletions
diff --git a/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp b/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp index 5a5d23c797..ef1757dbad 100644 --- a/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp +++ b/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp @@ -31,6 +31,17 @@ namespace store { namespace ms_sql { void +BlobRecordset::add(const qpid::broker::Persistable& item) +{ + BlobEncoder blob (item); // Marshall item info to a blob + rs->AddNew(); + rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); + rs->Update(); + uint64_t id = rs->Fields->Item["persistenceId"]->Value; + item.setPersistenceId(id); +} + +void BlobRecordset::remove(uint64_t id) { // Look up the item by its persistenceId @@ -45,17 +56,6 @@ BlobRecordset::remove(uint64_t id) } void -BlobRecordset::add(const qpid::broker::Persistable& item) -{ - BlobEncoder blob (item); // Marshall item info to a blob - rs->AddNew(); - rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); - rs->Update(); - uint64_t id = rs->Fields->Item["persistenceId"]->Value; - item.setPersistenceId(id); -} - -void BlobRecordset::remove(const qpid::broker::Persistable& item) { remove(item.getPersistenceId()); diff --git a/cpp/src/qpid/store/ms-sql/BlobRecordset.h b/cpp/src/qpid/store/ms-sql/BlobRecordset.h index 4e89254b48..4d1c338746 100644 --- a/cpp/src/qpid/store/ms-sql/BlobRecordset.h +++ b/cpp/src/qpid/store/ms-sql/BlobRecordset.h @@ -37,11 +37,12 @@ namespace ms_sql { */ class BlobRecordset : public Recordset { protected: - // Remove a record given its Id. - void remove(uint64_t id); public: void add(const qpid::broker::Persistable& item); + + // Remove a record given its Id. + void remove(uint64_t id); void remove(const qpid::broker::Persistable& item); // Dump table contents; useful for debugging. diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index e06f8a750f..a26df59df7 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -399,8 +399,8 @@ MSSqlProvider::create(PersistableQueue& queue, const qpid::framing::FieldTable& /*args needed for jrnl*/) { DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; try { - BlobRecordset rsQueues; db->beginTransaction(); rsQueues.open(db, TblQueue); rsQueues.add(queue); @@ -418,16 +418,36 @@ MSSqlProvider::create(PersistableQueue& queue, void MSSqlProvider::destroy(PersistableQueue& queue) { + // MessageDeleter class for use with for_each, below. + class MessageDeleter { + BlobRecordset& msgs; + public: + explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {} + void operator()(uint64_t msgId) { msgs.remove(msgId); } + }; + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + BindingRecordset rsBindings; + MessageRecordset rsMessages; + MessageMapRecordset rsMessageMaps; try { - BlobRecordset rsQueues; - BindingRecordset rsBindings; db->beginTransaction(); rsQueues.open(db, TblQueue); rsBindings.open(db, TblBinding); + rsMessages.open(db, TblMessage); + rsMessageMaps.open(db, TblMessageMap); // Remove bindings first; the queue IDs can't be ripped out from - // under the references in the bindings table. + // under the references in the bindings table. Then remove the + // message->queue entries for the queue, also because the queue can't + // be deleted while there are references to it. If there are messages + // orphaned by removing the queue references, those messages can + // also be deleted. Lastly, the queue record can be removed. rsBindings.removeForQueue(queue.getPersistenceId()); + std::vector<uint64_t> orphans; + rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans); + std::for_each(orphans.begin(), orphans.end(), + MessageDeleter(rsMessages)); rsQueues.remove(queue); db->commitTransaction(); } @@ -445,8 +465,8 @@ MSSqlProvider::create(const PersistableExchange& exchange, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; try { - BlobRecordset rsExchanges; db->beginTransaction(); rsExchanges.open(db, TblExchange); rsExchanges.add(exchange); @@ -465,9 +485,9 @@ void MSSqlProvider::destroy(const PersistableExchange& exchange) { DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + BindingRecordset rsBindings; try { - BlobRecordset rsExchanges; - BindingRecordset rsBindings; db->beginTransaction(); rsExchanges.open(db, TblExchange); rsBindings.open(db, TblBinding); @@ -493,8 +513,8 @@ MSSqlProvider::bind(const PersistableExchange& exchange, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; try { - BindingRecordset rsBindings; db->beginTransaction(); rsBindings.open(db, TblBinding); rsBindings.add(exchange.getPersistenceId(), @@ -520,8 +540,8 @@ MSSqlProvider::unbind(const PersistableExchange& exchange, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; try { - BindingRecordset rsBindings; db->beginTransaction(); rsBindings.open(db, TblBinding); rsBindings.remove(exchange.getPersistenceId(), @@ -544,8 +564,8 @@ void MSSqlProvider::create(const PersistableConfig& config) { DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; try { - BlobRecordset rsConfigs; db->beginTransaction(); rsConfigs.open(db, TblConfig); rsConfigs.add(config); @@ -564,8 +584,8 @@ void MSSqlProvider::destroy(const PersistableConfig& config) { DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; try { - BlobRecordset rsConfigs; db->beginTransaction(); rsConfigs.open(db, TblConfig); rsConfigs.remove(config); @@ -590,8 +610,8 @@ void MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) { DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; try { - MessageRecordset rsMessages; db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.add(msg); @@ -613,8 +633,8 @@ void MSSqlProvider::destroy(PersistableMessage& msg) { DatabaseConnection *db = initConnection(); + BlobRecordset rsMessages; try { - BlobRecordset rsMessages; db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.remove(msg); @@ -634,8 +654,8 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage const std::string& data) { DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; try { - MessageRecordset rsMessages; db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.append(msg, data); @@ -665,8 +685,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, // SQL store keeps all messages in one table, so we don't need the // queue reference. DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; try { - MessageRecordset rsMessages; rsMessages.open(db, TblMessage); rsMessages.loadContent(msg, data, offset, length); } @@ -710,13 +730,13 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, } } + MessageRecordset rsMessages; + MessageMapRecordset rsMap; try { if (msg->getPersistenceId() == 0) { // Message itself not yet saved - MessageRecordset rsMessages; rsMessages.open(db, TblMessage); rsMessages.add(msg); } - MessageMapRecordset rsMap; rsMap.open(db, TblMessageMap); rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); if (atxn) @@ -769,13 +789,13 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, } } + MessageMapRecordset rsMap; + MessageRecordset rsMessages; try { - MessageMapRecordset rsMap; rsMap.open(db, TblMessageMap); bool more = rsMap.remove(msg->getPersistenceId(), queue.getPersistenceId()); if (!more) { - MessageRecordset rsMessages; rsMessages.open(db, TblMessage); rsMessages.remove(msg); } diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp index 7072015864..68e174a2b0 100644 --- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp +++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp @@ -48,12 +48,15 @@ MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId) std::ostringstream filter; filter << "messageId = " << messageId << std::ends; rs->PutFilter (VariantHelper<std::string>(filter.str())); + if (rs->RecordCount == 0) + return false; MessageMap m; IADORecordBinding *piAdoRecordBinding; - rs->QueryInterface(__uuidof(IADORecordBinding), + rs->QueryInterface(__uuidof(IADORecordBinding), (LPVOID *)&piAdoRecordBinding); piAdoRecordBinding->BindToRecordset(&m); bool moreEntries = false, deleted = false; + rs->MoveFirst(); // If the desired mapping gets deleted, and we already know there are // other mappings for the message, don't bother finishing the scan. while (!rs->EndOfFile && !(deleted && moreEntries)) { @@ -68,10 +71,47 @@ MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId) rs->MoveNext(); } piAdoRecordBinding->Release(); + rs->Filter = ""; return moreEntries; } void +MessageMapRecordset::removeForQueue(uint64_t queueId, + std::vector<uint64_t>& orphaned) +{ + // Read all the messages queued on queueId and add them to the orphaned + // list. Then remove each one and learn if there are references to it + // from other queues. The ones without references are left in the + // orphaned list, others are removed. + std::ostringstream filter; + filter << "queueId = " << queueId << std::ends; + rs->PutFilter (VariantHelper<std::string>(filter.str())); + MessageMap m; + IADORecordBinding *piAdoRecordBinding; + rs->QueryInterface(__uuidof(IADORecordBinding), + (LPVOID *)&piAdoRecordBinding); + piAdoRecordBinding->BindToRecordset(&m); + while (!rs->EndOfFile) { + orphaned.push_back(m.messageId); + rs->MoveNext(); + } + piAdoRecordBinding->Release(); + rs->Filter = ""; // Remove filter on queueId + rs->Requery(adOptionUnspecified); // Get the entire map again + + // Now delete all the messages on this queue; any message that still has + // references from other queue(s) is removed from orphaned. + for (std::vector<uint64_t>::iterator i = orphaned.begin(); + i != orphaned.end(); + ) { + if (remove(*i, queueId)) + i = orphaned.erase(i); // There are other refs to message *i + else + ++i; + } +} + +void MessageMapRecordset::recover(MessageQueueMap& msgMap) { if (rs->BOF && rs->EndOfFile) diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h index a2e91abb96..475137b18b 100644 --- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h +++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h @@ -23,6 +23,7 @@ */ #include <icrsint.h> +#include <vector> #include "Recordset.h" #include <qpid/broker/RecoveryManager.h> @@ -40,7 +41,7 @@ class MessageMapRecordset : public Recordset { class MessageMap : public CADORecordBinding { BEGIN_ADO_BINDING(MessageMap) ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE) - ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, queueId, FALSE) + ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE) END_ADO_BINDING() public: @@ -57,6 +58,13 @@ public: // exists on any other queues. bool remove(uint64_t messageId, uint64_t queueId); + // Remove mappings for all messages on a specified queue. If there are + // messages that were only on the specified queue and are, therefore, + // orphaned now, return them in the orphaned vector. The orphaned + // messages can be deleted permanently as they are not referenced on + // any other queues. + void removeForQueue(uint64_t queueId, std::vector<uint64_t>& orphaned); + // Recover the mappings of message ID -> vector<queue ID>. void recover(MessageQueueMap& msgMap); diff --git a/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp index edf5d8e518..b62a333df6 100644 --- a/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp +++ b/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp @@ -113,7 +113,7 @@ MessageRecordset::loadContent(const boost::intrusive_ptr<const qpid::broker::Per void MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer, - std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap) + std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap) { if (rs->BOF && rs->EndOfFile) return; // Nothing to do diff --git a/cpp/src/qpid/store/ms-sql/MessageRecordset.h b/cpp/src/qpid/store/ms-sql/MessageRecordset.h index f1835f6f73..698b2561fe 100644 --- a/cpp/src/qpid/store/ms-sql/MessageRecordset.h +++ b/cpp/src/qpid/store/ms-sql/MessageRecordset.h @@ -74,7 +74,7 @@ public: // Recover messages and save a map of those recovered. void recover(qpid::broker::RecoveryManager& recoverer, - std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap); + std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap); // Dump table contents; useful for debugging. void dump(); |