diff options
Diffstat (limited to 'cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp')
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 58 |
1 files changed, 39 insertions, 19 deletions
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index e06f8a750f..a26df59df7 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -399,8 +399,8 @@ MSSqlProvider::create(PersistableQueue& queue, const qpid::framing::FieldTable& /*args needed for jrnl*/) { DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; try { - BlobRecordset rsQueues; db->beginTransaction(); rsQueues.open(db, TblQueue); rsQueues.add(queue); @@ -418,16 +418,36 @@ MSSqlProvider::create(PersistableQueue& queue, void MSSqlProvider::destroy(PersistableQueue& queue) { + // MessageDeleter class for use with for_each, below. + class MessageDeleter { + BlobRecordset& msgs; + public: + explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {} + void operator()(uint64_t msgId) { msgs.remove(msgId); } + }; + DatabaseConnection *db = initConnection(); + BlobRecordset rsQueues; + BindingRecordset rsBindings; + MessageRecordset rsMessages; + MessageMapRecordset rsMessageMaps; try { - BlobRecordset rsQueues; - BindingRecordset rsBindings; db->beginTransaction(); rsQueues.open(db, TblQueue); rsBindings.open(db, TblBinding); + rsMessages.open(db, TblMessage); + rsMessageMaps.open(db, TblMessageMap); // Remove bindings first; the queue IDs can't be ripped out from - // under the references in the bindings table. + // under the references in the bindings table. Then remove the + // message->queue entries for the queue, also because the queue can't + // be deleted while there are references to it. If there are messages + // orphaned by removing the queue references, those messages can + // also be deleted. Lastly, the queue record can be removed. rsBindings.removeForQueue(queue.getPersistenceId()); + std::vector<uint64_t> orphans; + rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans); + std::for_each(orphans.begin(), orphans.end(), + MessageDeleter(rsMessages)); rsQueues.remove(queue); db->commitTransaction(); } @@ -445,8 +465,8 @@ MSSqlProvider::create(const PersistableExchange& exchange, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; try { - BlobRecordset rsExchanges; db->beginTransaction(); rsExchanges.open(db, TblExchange); rsExchanges.add(exchange); @@ -465,9 +485,9 @@ void MSSqlProvider::destroy(const PersistableExchange& exchange) { DatabaseConnection *db = initConnection(); + BlobRecordset rsExchanges; + BindingRecordset rsBindings; try { - BlobRecordset rsExchanges; - BindingRecordset rsBindings; db->beginTransaction(); rsExchanges.open(db, TblExchange); rsBindings.open(db, TblBinding); @@ -493,8 +513,8 @@ MSSqlProvider::bind(const PersistableExchange& exchange, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; try { - BindingRecordset rsBindings; db->beginTransaction(); rsBindings.open(db, TblBinding); rsBindings.add(exchange.getPersistenceId(), @@ -520,8 +540,8 @@ MSSqlProvider::unbind(const PersistableExchange& exchange, const qpid::framing::FieldTable& args) { DatabaseConnection *db = initConnection(); + BindingRecordset rsBindings; try { - BindingRecordset rsBindings; db->beginTransaction(); rsBindings.open(db, TblBinding); rsBindings.remove(exchange.getPersistenceId(), @@ -544,8 +564,8 @@ void MSSqlProvider::create(const PersistableConfig& config) { DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; try { - BlobRecordset rsConfigs; db->beginTransaction(); rsConfigs.open(db, TblConfig); rsConfigs.add(config); @@ -564,8 +584,8 @@ void MSSqlProvider::destroy(const PersistableConfig& config) { DatabaseConnection *db = initConnection(); + BlobRecordset rsConfigs; try { - BlobRecordset rsConfigs; db->beginTransaction(); rsConfigs.open(db, TblConfig); rsConfigs.remove(config); @@ -590,8 +610,8 @@ void MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) { DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; try { - MessageRecordset rsMessages; db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.add(msg); @@ -613,8 +633,8 @@ void MSSqlProvider::destroy(PersistableMessage& msg) { DatabaseConnection *db = initConnection(); + BlobRecordset rsMessages; try { - BlobRecordset rsMessages; db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.remove(msg); @@ -634,8 +654,8 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage const std::string& data) { DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; try { - MessageRecordset rsMessages; db->beginTransaction(); rsMessages.open(db, TblMessage); rsMessages.append(msg, data); @@ -665,8 +685,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, // SQL store keeps all messages in one table, so we don't need the // queue reference. DatabaseConnection *db = initConnection(); + MessageRecordset rsMessages; try { - MessageRecordset rsMessages; rsMessages.open(db, TblMessage); rsMessages.loadContent(msg, data, offset, length); } @@ -710,13 +730,13 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, } } + MessageRecordset rsMessages; + MessageMapRecordset rsMap; try { if (msg->getPersistenceId() == 0) { // Message itself not yet saved - MessageRecordset rsMessages; rsMessages.open(db, TblMessage); rsMessages.add(msg); } - MessageMapRecordset rsMap; rsMap.open(db, TblMessageMap); rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); if (atxn) @@ -769,13 +789,13 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, } } + MessageMapRecordset rsMap; + MessageRecordset rsMessages; try { - MessageMapRecordset rsMap; rsMap.open(db, TblMessageMap); bool more = rsMap.remove(msg->getPersistenceId(), queue.getPersistenceId()); if (!more) { - MessageRecordset rsMessages; rsMessages.open(db, TblMessage); rsMessages.remove(msg); } |