From 10a368d1773654455e678d604f0d3b7e4e95255b Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Wed, 3 Nov 2010 23:12:08 +0000 Subject: Catch com exceptions during db recovery and rethrow as ADOExceptions which the broker can deal with. Resolves QPID-2925. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1030752 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 170 ++++++++++++++--------- 1 file changed, 105 insertions(+), 65 deletions(-) diff --git a/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index bc3584dd57..7f22db3d02 100644 --- a/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/qpid/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -1025,75 +1025,99 @@ MSSqlProvider::collectPreparedXids(std::set& xids) void MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer) { - DatabaseConnection *db = initConnection(); - BlobRecordset rsConfigs; - rsConfigs.open(db, TblConfig); - _RecordsetPtr p = (_RecordsetPtr)rsConfigs; - if (p->BOF && p->EndOfFile) - return; // Nothing to do - p->MoveFirst(); - while (!p->EndOfFile) { - uint64_t id = p->Fields->Item["persistenceId"]->Value; - long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; - BlobAdapter blob(blobSize); - blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - // Recreate the Config instance and reset its ID. - broker::RecoverableConfig::shared_ptr config = - recoverer.recoverConfig(blob); - config->setPersistenceId(id); - p->MoveNext(); + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsConfigs; + rsConfigs.open(db, TblConfig); + _RecordsetPtr p = (_RecordsetPtr)rsConfigs; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Config instance and reset its ID. + broker::RecoverableConfig::shared_ptr config = + recoverer.recoverConfig(blob); + config->setPersistenceId(id); + p->MoveNext(); + } } + catch(_com_error &e) { + throw ADOException("Error recovering configs", + e, + db ? db->getErrors() : ""); + } } void MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer, ExchangeMap& exchangeMap) { - DatabaseConnection *db = initConnection(); - BlobRecordset rsExchanges; - rsExchanges.open(db, TblExchange); - _RecordsetPtr p = (_RecordsetPtr)rsExchanges; - if (p->BOF && p->EndOfFile) - return; // Nothing to do - p->MoveFirst(); - while (!p->EndOfFile) { - uint64_t id = p->Fields->Item["persistenceId"]->Value; - long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; - BlobAdapter blob(blobSize); - blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - // Recreate the Exchange instance, reset its ID, and remember the - // ones restored for matching up when recovering bindings. - broker::RecoverableExchange::shared_ptr exchange = - recoverer.recoverExchange(blob); - exchange->setPersistenceId(id); - exchangeMap[id] = exchange; - p->MoveNext(); + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsExchanges; + rsExchanges.open(db, TblExchange); + _RecordsetPtr p = (_RecordsetPtr)rsExchanges; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Exchange instance, reset its ID, and remember the + // ones restored for matching up when recovering bindings. + broker::RecoverableExchange::shared_ptr exchange = + recoverer.recoverExchange(blob); + exchange->setPersistenceId(id); + exchangeMap[id] = exchange; + p->MoveNext(); + } } + catch(_com_error &e) { + throw ADOException("Error recovering exchanges", + e, + db ? db->getErrors() : ""); + } } void MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer, QueueMap& queueMap) { - DatabaseConnection *db = initConnection(); - BlobRecordset rsQueues; - rsQueues.open(db, TblQueue); - _RecordsetPtr p = (_RecordsetPtr)rsQueues; - if (p->BOF && p->EndOfFile) - return; // Nothing to do - p->MoveFirst(); - while (!p->EndOfFile) { - uint64_t id = p->Fields->Item["persistenceId"]->Value; - long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; - BlobAdapter blob(blobSize); - blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - // Recreate the Queue instance and reset its ID. - broker::RecoverableQueue::shared_ptr queue = - recoverer.recoverQueue(blob); - queue->setPersistenceId(id); - queueMap[id] = queue; - p->MoveNext(); + DatabaseConnection *db = 0; + try { + db = initConnection(); + BlobRecordset rsQueues; + rsQueues.open(db, TblQueue); + _RecordsetPtr p = (_RecordsetPtr)rsQueues; + if (p->BOF && p->EndOfFile) + return; // Nothing to do + p->MoveFirst(); + while (!p->EndOfFile) { + uint64_t id = p->Fields->Item["persistenceId"]->Value; + long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize; + BlobAdapter blob(blobSize); + blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); + // Recreate the Queue instance and reset its ID. + broker::RecoverableQueue::shared_ptr queue = + recoverer.recoverQueue(blob); + queue->setPersistenceId(id); + queueMap[id] = queue; + p->MoveNext(); + } } + catch(_com_error &e) { + throw ADOException("Error recovering queues", + e, + db ? db->getErrors() : ""); + } } void @@ -1101,10 +1125,18 @@ MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer, const ExchangeMap& exchangeMap, const QueueMap& queueMap) { - DatabaseConnection *db = initConnection(); - BindingRecordset rsBindings; - rsBindings.open(db, TblBinding); - rsBindings.recover(recoverer, exchangeMap, queueMap); + DatabaseConnection *db = 0; + try { + db = initConnection(); + BindingRecordset rsBindings; + rsBindings.open(db, TblBinding); + rsBindings.recover(recoverer, exchangeMap, queueMap); + } + catch(_com_error &e) { + throw ADOException("Error recovering bindings", + e, + db ? db->getErrors() : ""); + } } void @@ -1112,14 +1144,22 @@ MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap) { - DatabaseConnection *db = initConnection(); - MessageRecordset rsMessages; - rsMessages.open(db, TblMessage); - rsMessages.recover(recoverer, messageMap); + DatabaseConnection *db = 0; + try { + db = initConnection(); + MessageRecordset rsMessages; + rsMessages.open(db, TblMessage); + rsMessages.recover(recoverer, messageMap); - MessageMapRecordset rsMessageMaps; - rsMessageMaps.open(db, TblMessageMap); - rsMessageMaps.recover(messageQueueMap); + MessageMapRecordset rsMessageMaps; + rsMessageMaps.open(db, TblMessageMap); + rsMessageMaps.recover(messageQueueMap); + } + catch(_com_error &e) { + throw ADOException("Error recovering messages", + e, + db ? db->getErrors() : ""); + } } void -- cgit v1.2.1