diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/RecoverableExchange.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/store/MessageStorePlugin.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/store/StorageProvider.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/BindingRecordset.cpp | 89 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/BindingRecordset.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 25 |
7 files changed, 91 insertions, 58 deletions
diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h index 76d0d2ecdf..ca6cc1541e 100644 --- a/cpp/src/qpid/broker/RecoverableExchange.h +++ b/cpp/src/qpid/broker/RecoverableExchange.h @@ -40,7 +40,9 @@ public: /** * Recover binding. Nb: queue must have been recovered earlier. */ - virtual void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args) = 0; + virtual void bind(const std::string& queue, + const std::string& routingKey, + qpid::framing::FieldTable& args) = 0; virtual ~RecoverableExchange() {}; }; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 9c3a2b5571..0369fcd71d 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -78,7 +78,7 @@ class RecoverableExchangeImpl : public RecoverableExchange public: RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} void setPersistenceId(uint64_t id); - void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); + void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args); }; class RecoverableConfigImpl : public RecoverableConfig @@ -230,7 +230,9 @@ void RecoverableConfigImpl::setPersistenceId(uint64_t id) bridge->setPersistenceId(id); } -void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) +void RecoverableExchangeImpl::bind(const string& queueName, + const string& key, + framing::FieldTable& args) { Queue::shared_ptr queue = queues.find(queueName); exchange->bind(queue, key, &args); diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp index 6720fd13f3..05b6ef4465 100644 --- a/cpp/src/qpid/store/MessageStorePlugin.cpp +++ b/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -409,7 +409,7 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) provider->second->recoverConfigs(recoverer); provider->second->recoverExchanges(recoverer, exchanges); provider->second->recoverQueues(recoverer, queues); - provider->second->recoverBindings(recoverer, exchanges); + provider->second->recoverBindings(recoverer, exchanges, queues); provider->second->recoverMessages(recoverer, messages, messageQueueMap); // Enqueue msgs where needed. for (MessageQueueMap::const_iterator i = messageQueueMap.begin(); diff --git a/cpp/src/qpid/store/StorageProvider.h b/cpp/src/qpid/store/StorageProvider.h index 5dc10ecf4a..1f257e7416 100644 --- a/cpp/src/qpid/store/StorageProvider.h +++ b/cpp/src/qpid/store/StorageProvider.h @@ -311,7 +311,8 @@ public: virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, QueueMap& queueMap) = 0; virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, - const ExchangeMap& exchangeMap) = 0; + const ExchangeMap& exchangeMap, + const QueueMap& queueMap) = 0; virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap) = 0; diff --git a/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp b/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp index 737d93b142..1dc4370312 100644 --- a/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp +++ b/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp @@ -32,17 +32,32 @@ namespace store { namespace ms_sql { void +BindingRecordset::removeFilter(const std::string& filter) +{ + rs->PutFilter (VariantHelper<std::string>(filter)); + long recs = rs->GetRecordCount(); + if (recs == 0) + return; // Nothing to do + while (recs > 0) { + // Deleting adAffectAll doesn't work as documented; go one by one. + rs->Delete(adAffectCurrent); + if (--recs > 0) + rs->MoveNext(); + } + rs->Update(); +} + +void BindingRecordset::add(uint64_t exchangeId, - const std::string& queueName, + uint64_t queueId, const std::string& routingKey, const qpid::framing::FieldTable& args) { - VariantHelper<std::string> queueNameStr(queueName); VariantHelper<std::string> routingKeyStr(routingKey); BlobEncoder blob (args); // Marshall field table to a blob rs->AddNew(); rs->Fields->GetItem("exchangeId")->Value = exchangeId; - rs->Fields->GetItem("queueName")->Value = queueNameStr; + rs->Fields->GetItem("queueId")->Value = queueId; rs->Fields->GetItem("routingKey")->Value = routingKeyStr; rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob); rs->Update(); @@ -50,57 +65,40 @@ BindingRecordset::add(uint64_t exchangeId, void BindingRecordset::remove(uint64_t exchangeId, - const std::string& queueName, + uint64_t queueId, const std::string& routingKey, const qpid::framing::FieldTable& /*args*/) { // Look up the affected binding. std::ostringstream filter; filter << "exchangeId = " << exchangeId - << " AND queueName = '" << queueName << "'" + << " AND queueId = " << queueId << " AND routingKey = '" << routingKey << "'" << std::ends; - rs->PutFilter (VariantHelper<std::string>(filter.str())); - if (rs->RecordCount != 0) { - // Delete the records - rs->Delete(adAffectGroup); - rs->Update(); - } - requery(); + removeFilter(filter.str()); } void -BindingRecordset::remove(uint64_t exchangeId) +BindingRecordset::removeForExchange(uint64_t exchangeId) { // Look up the affected bindings by the exchange ID std::ostringstream filter; filter << "exchangeId = " << exchangeId << std::ends; - rs->PutFilter (VariantHelper<std::string>(filter.str())); - if (rs->RecordCount != 0) { - // Delete the records - rs->Delete(adAffectGroup); - rs->Update(); - } - requery(); + removeFilter(filter.str()); } void -BindingRecordset::remove(const std::string& queueName) +BindingRecordset::removeForQueue(uint64_t queueId) { - // Look up the affected bindings by the exchange ID + // Look up the affected bindings by the queue ID std::ostringstream filter; - filter << "queueName = '" << queueName << "'" << std::ends; - rs->PutFilter (VariantHelper<std::string>(filter.str())); - if (rs->RecordCount != 0) { - // Delete the records - rs->Delete(adAffectGroup); - rs->Update(); - } - requery(); + filter << "queueId = " << queueId << std::ends; + removeFilter(filter.str()); } void -BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer, - std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap) +BindingRecordset::recover(broker::RecoveryManager& recoverer, + const store::ExchangeMap& exchMap, + const store::QueueMap& queueMap) { if (rs->BOF && rs->EndOfFile) return; // Nothing to do @@ -114,9 +112,26 @@ BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer, long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize; BlobAdapter blob(blobSize); blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize); - broker::RecoverableExchange::shared_ptr exch = exchMap[b.exchangeId]; - std::string q(b.queueName), k(b.routingKey); - exch->bind(q, k, blob); + store::ExchangeMap::const_iterator exch = exchMap.find(b.exchangeId); + if (exch == exchMap.end()) { + std::ostringstream msg; + msg << "Error recovering bindings; exchange ID " << b.exchangeId + << " not found in exchange map"; + throw qpid::Exception(msg.str()); + } + broker::RecoverableExchange::shared_ptr exchPtr = exch->second; + store::QueueMap::const_iterator q = queueMap.find(b.queueId); + if (q == queueMap.end()) { + std::ostringstream msg; + msg << "Error recovering bindings; queue ID " << b.queueId + << " not found in queue map"; + throw qpid::Exception(msg.str()); + } + broker::RecoverableQueue::shared_ptr qPtr = q->second; + // The recovery manager wants the queue name, so get it from the + // RecoverableQueue. + std::string key(b.routingKey); + exchPtr->bind(qPtr->getName(), key, blob); rs->MoveNext(); } @@ -138,8 +153,8 @@ BindingRecordset::dump() piAdoRecordBinding->BindToRecordset(&b); while (VARIANT_FALSE == rs->EndOfFile) { - QPID_LOG(notice, "exch " << b.exchangeId - << ", q: " << b.queueName + QPID_LOG(notice, "exch Id " << b.exchangeId + << ", q Id " << b.queueId << ", k: " << b.routingKey); rs->MoveNext(); } diff --git a/cpp/src/qpid/store/ms-sql/BindingRecordset.h b/cpp/src/qpid/store/ms-sql/BindingRecordset.h index 5c51636f4f..3cb732de75 100644 --- a/cpp/src/qpid/store/ms-sql/BindingRecordset.h +++ b/cpp/src/qpid/store/ms-sql/BindingRecordset.h @@ -24,6 +24,7 @@ #include <icrsint.h> #include "Recordset.h" +#include <qpid/store/StorageProvider.h> #include <qpid/broker/RecoveryManager.h> namespace qpid { @@ -40,40 +41,43 @@ class BindingRecordset : public Recordset { class Binding : public CADORecordBinding { BEGIN_ADO_BINDING(Binding) ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE) - ADO_VARIABLE_LENGTH_ENTRY4(2, adVarChar, queueName, - sizeof(queueName), FALSE) + ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE) ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey, sizeof(routingKey), FALSE) END_ADO_BINDING() public: uint64_t exchangeId; - char queueName[256]; + uint64_t queueId; char routingKey[256]; }; + // Remove all records matching the specified filter/query. + void removeFilter(const std::string& filter); + public: // Add a new binding void add(uint64_t exchangeId, - const std::string& queueName, + uint64_t queueId, const std::string& routingKey, const qpid::framing::FieldTable& args); // Remove a specific binding void remove(uint64_t exchangeId, - const std::string& queueName, + uint64_t queueId, const std::string& routingKey, const qpid::framing::FieldTable& args); // Remove all bindings for the specified exchange - void remove(uint64_t exchangeId); + void removeForExchange(uint64_t exchangeId); // Remove all bindings for the specified queue - void remove(const std::string& queueName); + void removeForQueue(uint64_t queueId); // Recover bindings set using exchMap to get from Id to RecoverableExchange. void recover(qpid::broker::RecoveryManager& recoverer, - std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap); + const qpid::store::ExchangeMap& exchMap, + const qpid::store::QueueMap& queueMap); // Dump table contents; useful for debugging. void dump(); diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index b8c020fabe..4c2b9892ab 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -267,7 +267,8 @@ public: virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer, QueueMap& queueMap); virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer, - const ExchangeMap& exchangeMap); + const ExchangeMap& exchangeMap, + const QueueMap& queueMap); virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap); @@ -424,8 +425,10 @@ MSSqlProvider::destroy(PersistableQueue& queue) db->beginTransaction(); rsQueues.open(db, TblQueue); rsBindings.open(db, TblBinding); + // Remove bindings first; the queue IDs can't be ripped out from + // under the references in the bindings table. + rsBindings.removeForQueue(queue.getPersistenceId()); rsQueues.remove(queue); - rsBindings.remove(queue.getName()); db->commitTransaction(); } catch(_com_error &e) { @@ -468,8 +471,10 @@ MSSqlProvider::destroy(const PersistableExchange& exchange) db->beginTransaction(); rsExchanges.open(db, TblExchange); rsBindings.open(db, TblBinding); + // Remove bindings first; the exchange IDs can't be ripped out from + // under the references in the bindings table. + rsBindings.removeForExchange(exchange.getPersistenceId()); rsExchanges.remove(exchange); - rsBindings.remove(exchange.getPersistenceId()); db->commitTransaction(); } catch(_com_error &e) { @@ -492,7 +497,10 @@ MSSqlProvider::bind(const PersistableExchange& exchange, BindingRecordset rsBindings; db->beginTransaction(); rsBindings.open(db, TblBinding); - rsBindings.add(exchange.getPersistenceId(), queue.getName(), key, args); + rsBindings.add(exchange.getPersistenceId(), + queue.getPersistenceId(), + key, + args); db->commitTransaction(); } catch(_com_error &e) { @@ -517,7 +525,7 @@ MSSqlProvider::unbind(const PersistableExchange& exchange, db->beginTransaction(); rsBindings.open(db, TblBinding); rsBindings.remove(exchange.getPersistenceId(), - queue.getName(), + queue.getPersistenceId(), key, args); db->commitTransaction(); @@ -888,12 +896,13 @@ MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer, void MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer, - const ExchangeMap& exchangeMap) + const ExchangeMap& exchangeMap, + const QueueMap& queueMap) { DatabaseConnection *db = initConnection(); BindingRecordset rsBindings; rsBindings.open(db, TblBinding); - rsBindings.recover(recoverer, exchangeMap); + rsBindings.recover(recoverer, exchangeMap, queueMap); } void @@ -947,7 +956,7 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) " fieldTableBlob varbinary(MAX) NOT NULL)"; const std::string bindingSpecs = " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL," - " queueName varchar(255) NOT NULL," + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," " routingKey varchar(255)," " fieldTableBlob varbinary(MAX))"; const std::string messageMapSpecs = |