summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-11-05 23:13:25 +0000
committerStephen D. Huston <shuston@apache.org>2009-11-05 23:13:25 +0000
commit8287ba11b7b8e0156ea0268b3d0fe649b955c67c (patch)
tree9deb5a5484e04b9d52e0afbee58db289e6015251 /cpp/src
parentdeb38bdd13c1cb4255eda867af66b763d8ded3be (diff)
downloadqpid-python-8287ba11b7b8e0156ea0268b3d0fe649b955c67c.tar.gz
Fix restoration of durable messages at startup. Remove all references to a queue that's about to be deleted from the database. Fixes QPID-2183.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@833230 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/store/ms-sql/BlobRecordset.cpp22
-rw-r--r--cpp/src/qpid/store/ms-sql/BlobRecordset.h5
-rw-r--r--cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp58
-rw-r--r--cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp42
-rw-r--r--cpp/src/qpid/store/ms-sql/MessageMapRecordset.h10
-rw-r--r--cpp/src/qpid/store/ms-sql/MessageRecordset.cpp2
-rw-r--r--cpp/src/qpid/store/ms-sql/MessageRecordset.h2
7 files changed, 105 insertions, 36 deletions
diff --git a/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp b/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
index 5a5d23c797..ef1757dbad 100644
--- a/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
+++ b/cpp/src/qpid/store/ms-sql/BlobRecordset.cpp
@@ -31,6 +31,17 @@ namespace store {
namespace ms_sql {
void
+BlobRecordset::add(const qpid::broker::Persistable& item)
+{
+ BlobEncoder blob (item); // Marshall item info to a blob
+ rs->AddNew();
+ rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
+ rs->Update();
+ uint64_t id = rs->Fields->Item["persistenceId"]->Value;
+ item.setPersistenceId(id);
+}
+
+void
BlobRecordset::remove(uint64_t id)
{
// Look up the item by its persistenceId
@@ -45,17 +56,6 @@ BlobRecordset::remove(uint64_t id)
}
void
-BlobRecordset::add(const qpid::broker::Persistable& item)
-{
- BlobEncoder blob (item); // Marshall item info to a blob
- rs->AddNew();
- rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
- rs->Update();
- uint64_t id = rs->Fields->Item["persistenceId"]->Value;
- item.setPersistenceId(id);
-}
-
-void
BlobRecordset::remove(const qpid::broker::Persistable& item)
{
remove(item.getPersistenceId());
diff --git a/cpp/src/qpid/store/ms-sql/BlobRecordset.h b/cpp/src/qpid/store/ms-sql/BlobRecordset.h
index 4e89254b48..4d1c338746 100644
--- a/cpp/src/qpid/store/ms-sql/BlobRecordset.h
+++ b/cpp/src/qpid/store/ms-sql/BlobRecordset.h
@@ -37,11 +37,12 @@ namespace ms_sql {
*/
class BlobRecordset : public Recordset {
protected:
- // Remove a record given its Id.
- void remove(uint64_t id);
public:
void add(const qpid::broker::Persistable& item);
+
+ // Remove a record given its Id.
+ void remove(uint64_t id);
void remove(const qpid::broker::Persistable& item);
// Dump table contents; useful for debugging.
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
index e06f8a750f..a26df59df7 100644
--- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
+++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -399,8 +399,8 @@ MSSqlProvider::create(PersistableQueue& queue,
const qpid::framing::FieldTable& /*args needed for jrnl*/)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
try {
- BlobRecordset rsQueues;
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsQueues.add(queue);
@@ -418,16 +418,36 @@ MSSqlProvider::create(PersistableQueue& queue,
void
MSSqlProvider::destroy(PersistableQueue& queue)
{
+ // MessageDeleter class for use with for_each, below.
+ class MessageDeleter {
+ BlobRecordset& msgs;
+ public:
+ explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {}
+ void operator()(uint64_t msgId) { msgs.remove(msgId); }
+ };
+
DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ BindingRecordset rsBindings;
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMessageMaps;
try {
- BlobRecordset rsQueues;
- BindingRecordset rsBindings;
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsBindings.open(db, TblBinding);
+ rsMessages.open(db, TblMessage);
+ rsMessageMaps.open(db, TblMessageMap);
// Remove bindings first; the queue IDs can't be ripped out from
- // under the references in the bindings table.
+ // under the references in the bindings table. Then remove the
+ // message->queue entries for the queue, also because the queue can't
+ // be deleted while there are references to it. If there are messages
+ // orphaned by removing the queue references, those messages can
+ // also be deleted. Lastly, the queue record can be removed.
rsBindings.removeForQueue(queue.getPersistenceId());
+ std::vector<uint64_t> orphans;
+ rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans);
+ std::for_each(orphans.begin(), orphans.end(),
+ MessageDeleter(rsMessages));
rsQueues.remove(queue);
db->commitTransaction();
}
@@ -445,8 +465,8 @@ MSSqlProvider::create(const PersistableExchange& exchange,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
try {
- BlobRecordset rsExchanges;
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsExchanges.add(exchange);
@@ -465,9 +485,9 @@ void
MSSqlProvider::destroy(const PersistableExchange& exchange)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ BindingRecordset rsBindings;
try {
- BlobRecordset rsExchanges;
- BindingRecordset rsBindings;
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsBindings.open(db, TblBinding);
@@ -493,8 +513,8 @@ MSSqlProvider::bind(const PersistableExchange& exchange,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
try {
- BindingRecordset rsBindings;
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.add(exchange.getPersistenceId(),
@@ -520,8 +540,8 @@ MSSqlProvider::unbind(const PersistableExchange& exchange,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
try {
- BindingRecordset rsBindings;
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.remove(exchange.getPersistenceId(),
@@ -544,8 +564,8 @@ void
MSSqlProvider::create(const PersistableConfig& config)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
try {
- BlobRecordset rsConfigs;
db->beginTransaction();
rsConfigs.open(db, TblConfig);
rsConfigs.add(config);
@@ -564,8 +584,8 @@ void
MSSqlProvider::destroy(const PersistableConfig& config)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
try {
- BlobRecordset rsConfigs;
db->beginTransaction();
rsConfigs.open(db, TblConfig);
rsConfigs.remove(config);
@@ -590,8 +610,8 @@ void
MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
{
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.add(msg);
@@ -613,8 +633,8 @@ void
MSSqlProvider::destroy(PersistableMessage& msg)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsMessages;
try {
- BlobRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
@@ -634,8 +654,8 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage
const std::string& data)
{
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.append(msg, data);
@@ -665,8 +685,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
// SQL store keeps all messages in one table, so we don't need the
// queue reference.
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.loadContent(msg, data, offset, length);
}
@@ -710,13 +730,13 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
}
}
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMap;
try {
if (msg->getPersistenceId() == 0) { // Message itself not yet saved
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.add(msg);
}
- MessageMapRecordset rsMap;
rsMap.open(db, TblMessageMap);
rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
if (atxn)
@@ -769,13 +789,13 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
}
}
+ MessageMapRecordset rsMap;
+ MessageRecordset rsMessages;
try {
- MessageMapRecordset rsMap;
rsMap.open(db, TblMessageMap);
bool more = rsMap.remove(msg->getPersistenceId(),
queue.getPersistenceId());
if (!more) {
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
}
diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
index 7072015864..68e174a2b0 100644
--- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
+++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.cpp
@@ -48,12 +48,15 @@ MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId)
std::ostringstream filter;
filter << "messageId = " << messageId << std::ends;
rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ if (rs->RecordCount == 0)
+ return false;
MessageMap m;
IADORecordBinding *piAdoRecordBinding;
- rs->QueryInterface(__uuidof(IADORecordBinding),
+ rs->QueryInterface(__uuidof(IADORecordBinding),
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&m);
bool moreEntries = false, deleted = false;
+ rs->MoveFirst();
// If the desired mapping gets deleted, and we already know there are
// other mappings for the message, don't bother finishing the scan.
while (!rs->EndOfFile && !(deleted && moreEntries)) {
@@ -68,10 +71,47 @@ MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId)
rs->MoveNext();
}
piAdoRecordBinding->Release();
+ rs->Filter = "";
return moreEntries;
}
void
+MessageMapRecordset::removeForQueue(uint64_t queueId,
+ std::vector<uint64_t>& orphaned)
+{
+ // Read all the messages queued on queueId and add them to the orphaned
+ // list. Then remove each one and learn if there are references to it
+ // from other queues. The ones without references are left in the
+ // orphaned list, others are removed.
+ std::ostringstream filter;
+ filter << "queueId = " << queueId << std::ends;
+ rs->PutFilter (VariantHelper<std::string>(filter.str()));
+ MessageMap m;
+ IADORecordBinding *piAdoRecordBinding;
+ rs->QueryInterface(__uuidof(IADORecordBinding),
+ (LPVOID *)&piAdoRecordBinding);
+ piAdoRecordBinding->BindToRecordset(&m);
+ while (!rs->EndOfFile) {
+ orphaned.push_back(m.messageId);
+ rs->MoveNext();
+ }
+ piAdoRecordBinding->Release();
+ rs->Filter = ""; // Remove filter on queueId
+ rs->Requery(adOptionUnspecified); // Get the entire map again
+
+ // Now delete all the messages on this queue; any message that still has
+ // references from other queue(s) is removed from orphaned.
+ for (std::vector<uint64_t>::iterator i = orphaned.begin();
+ i != orphaned.end();
+ ) {
+ if (remove(*i, queueId))
+ i = orphaned.erase(i); // There are other refs to message *i
+ else
+ ++i;
+ }
+}
+
+void
MessageMapRecordset::recover(MessageQueueMap& msgMap)
{
if (rs->BOF && rs->EndOfFile)
diff --git a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
index a2e91abb96..475137b18b 100644
--- a/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
+++ b/cpp/src/qpid/store/ms-sql/MessageMapRecordset.h
@@ -23,6 +23,7 @@
*/
#include <icrsint.h>
+#include <vector>
#include "Recordset.h"
#include <qpid/broker/RecoveryManager.h>
@@ -40,7 +41,7 @@ class MessageMapRecordset : public Recordset {
class MessageMap : public CADORecordBinding {
BEGIN_ADO_BINDING(MessageMap)
ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, messageId, FALSE)
- ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, queueId, FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
END_ADO_BINDING()
public:
@@ -57,6 +58,13 @@ public:
// exists on any other queues.
bool remove(uint64_t messageId, uint64_t queueId);
+ // Remove mappings for all messages on a specified queue. If there are
+ // messages that were only on the specified queue and are, therefore,
+ // orphaned now, return them in the orphaned vector. The orphaned
+ // messages can be deleted permanently as they are not referenced on
+ // any other queues.
+ void removeForQueue(uint64_t queueId, std::vector<uint64_t>& orphaned);
+
// Recover the mappings of message ID -> vector<queue ID>.
void recover(MessageQueueMap& msgMap);
diff --git a/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp b/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
index edf5d8e518..b62a333df6 100644
--- a/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
+++ b/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
@@ -113,7 +113,7 @@ MessageRecordset::loadContent(const boost::intrusive_ptr<const qpid::broker::Per
void
MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer,
- std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap)
+ std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap)
{
if (rs->BOF && rs->EndOfFile)
return; // Nothing to do
diff --git a/cpp/src/qpid/store/ms-sql/MessageRecordset.h b/cpp/src/qpid/store/ms-sql/MessageRecordset.h
index f1835f6f73..698b2561fe 100644
--- a/cpp/src/qpid/store/ms-sql/MessageRecordset.h
+++ b/cpp/src/qpid/store/ms-sql/MessageRecordset.h
@@ -74,7 +74,7 @@ public:
// Recover messages and save a map of those recovered.
void recover(qpid::broker::RecoveryManager& recoverer,
- std::map<uint64_t, broker::RecoverableMessage::shared_ptr> messageMap);
+ std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap);
// Dump table contents; useful for debugging.
void dump();