summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/store
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/store')
-rw-r--r--cpp/src/qpid/store/MessageStorePlugin.cpp2
-rw-r--r--cpp/src/qpid/store/StorageProvider.h3
-rw-r--r--cpp/src/qpid/store/ms-sql/BindingRecordset.cpp89
-rw-r--r--cpp/src/qpid/store/ms-sql/BindingRecordset.h20
-rw-r--r--cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp25
5 files changed, 84 insertions, 55 deletions
diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp
index 6720fd13f3..05b6ef4465 100644
--- a/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -409,7 +409,7 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
provider->second->recoverConfigs(recoverer);
provider->second->recoverExchanges(recoverer, exchanges);
provider->second->recoverQueues(recoverer, queues);
- provider->second->recoverBindings(recoverer, exchanges);
+ provider->second->recoverBindings(recoverer, exchanges, queues);
provider->second->recoverMessages(recoverer, messages, messageQueueMap);
// Enqueue msgs where needed.
for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
diff --git a/cpp/src/qpid/store/StorageProvider.h b/cpp/src/qpid/store/StorageProvider.h
index 5dc10ecf4a..1f257e7416 100644
--- a/cpp/src/qpid/store/StorageProvider.h
+++ b/cpp/src/qpid/store/StorageProvider.h
@@ -311,7 +311,8 @@ public:
virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
QueueMap& queueMap) = 0;
virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
- const ExchangeMap& exchangeMap) = 0;
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap) = 0;
virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap) = 0;
diff --git a/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp b/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
index 737d93b142..1dc4370312 100644
--- a/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
+++ b/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
@@ -32,17 +32,32 @@ namespace store {
namespace ms_sql {
void
+BindingRecordset::removeFilter(const std::string& filter)
+{
+ rs->PutFilter (VariantHelper<std::string>(filter));
+ long recs = rs->GetRecordCount();
+ if (recs == 0)
+ return; // Nothing to do
+ while (recs > 0) {
+ // Deleting adAffectAll doesn't work as documented; go one by one.
+ rs->Delete(adAffectCurrent);
+ if (--recs > 0)
+ rs->MoveNext();
+ }
+ rs->Update();
+}
+
+void
BindingRecordset::add(uint64_t exchangeId,
- const std::string& queueName,
+ uint64_t queueId,
const std::string& routingKey,
const qpid::framing::FieldTable& args)
{
- VariantHelper<std::string> queueNameStr(queueName);
VariantHelper<std::string> routingKeyStr(routingKey);
BlobEncoder blob (args); // Marshall field table to a blob
rs->AddNew();
rs->Fields->GetItem("exchangeId")->Value = exchangeId;
- rs->Fields->GetItem("queueName")->Value = queueNameStr;
+ rs->Fields->GetItem("queueId")->Value = queueId;
rs->Fields->GetItem("routingKey")->Value = routingKeyStr;
rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
rs->Update();
@@ -50,57 +65,40 @@ BindingRecordset::add(uint64_t exchangeId,
void
BindingRecordset::remove(uint64_t exchangeId,
- const std::string& queueName,
+ uint64_t queueId,
const std::string& routingKey,
const qpid::framing::FieldTable& /*args*/)
{
// Look up the affected binding.
std::ostringstream filter;
filter << "exchangeId = " << exchangeId
- << " AND queueName = '" << queueName << "'"
+ << " AND queueId = " << queueId
<< " AND routingKey = '" << routingKey << "'" << std::ends;
- rs->PutFilter (VariantHelper<std::string>(filter.str()));
- if (rs->RecordCount != 0) {
- // Delete the records
- rs->Delete(adAffectGroup);
- rs->Update();
- }
- requery();
+ removeFilter(filter.str());
}
void
-BindingRecordset::remove(uint64_t exchangeId)
+BindingRecordset::removeForExchange(uint64_t exchangeId)
{
// Look up the affected bindings by the exchange ID
std::ostringstream filter;
filter << "exchangeId = " << exchangeId << std::ends;
- rs->PutFilter (VariantHelper<std::string>(filter.str()));
- if (rs->RecordCount != 0) {
- // Delete the records
- rs->Delete(adAffectGroup);
- rs->Update();
- }
- requery();
+ removeFilter(filter.str());
}
void
-BindingRecordset::remove(const std::string& queueName)
+BindingRecordset::removeForQueue(uint64_t queueId)
{
- // Look up the affected bindings by the exchange ID
+ // Look up the affected bindings by the queue ID
std::ostringstream filter;
- filter << "queueName = '" << queueName << "'" << std::ends;
- rs->PutFilter (VariantHelper<std::string>(filter.str()));
- if (rs->RecordCount != 0) {
- // Delete the records
- rs->Delete(adAffectGroup);
- rs->Update();
- }
- requery();
+ filter << "queueId = " << queueId << std::ends;
+ removeFilter(filter.str());
}
void
-BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer,
- std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap)
+BindingRecordset::recover(broker::RecoveryManager& recoverer,
+ const store::ExchangeMap& exchMap,
+ const store::QueueMap& queueMap)
{
if (rs->BOF && rs->EndOfFile)
return; // Nothing to do
@@ -114,9 +112,26 @@ BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer,
long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
BlobAdapter blob(blobSize);
blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
- broker::RecoverableExchange::shared_ptr exch = exchMap[b.exchangeId];
- std::string q(b.queueName), k(b.routingKey);
- exch->bind(q, k, blob);
+ store::ExchangeMap::const_iterator exch = exchMap.find(b.exchangeId);
+ if (exch == exchMap.end()) {
+ std::ostringstream msg;
+ msg << "Error recovering bindings; exchange ID " << b.exchangeId
+ << " not found in exchange map";
+ throw qpid::Exception(msg.str());
+ }
+ broker::RecoverableExchange::shared_ptr exchPtr = exch->second;
+ store::QueueMap::const_iterator q = queueMap.find(b.queueId);
+ if (q == queueMap.end()) {
+ std::ostringstream msg;
+ msg << "Error recovering bindings; queue ID " << b.queueId
+ << " not found in queue map";
+ throw qpid::Exception(msg.str());
+ }
+ broker::RecoverableQueue::shared_ptr qPtr = q->second;
+ // The recovery manager wants the queue name, so get it from the
+ // RecoverableQueue.
+ std::string key(b.routingKey);
+ exchPtr->bind(qPtr->getName(), key, blob);
rs->MoveNext();
}
@@ -138,8 +153,8 @@ BindingRecordset::dump()
piAdoRecordBinding->BindToRecordset(&b);
while (VARIANT_FALSE == rs->EndOfFile) {
- QPID_LOG(notice, "exch " << b.exchangeId
- << ", q: " << b.queueName
+ QPID_LOG(notice, "exch Id " << b.exchangeId
+ << ", q Id " << b.queueId
<< ", k: " << b.routingKey);
rs->MoveNext();
}
diff --git a/cpp/src/qpid/store/ms-sql/BindingRecordset.h b/cpp/src/qpid/store/ms-sql/BindingRecordset.h
index 5c51636f4f..3cb732de75 100644
--- a/cpp/src/qpid/store/ms-sql/BindingRecordset.h
+++ b/cpp/src/qpid/store/ms-sql/BindingRecordset.h
@@ -24,6 +24,7 @@
#include <icrsint.h>
#include "Recordset.h"
+#include <qpid/store/StorageProvider.h>
#include <qpid/broker/RecoveryManager.h>
namespace qpid {
@@ -40,40 +41,43 @@ class BindingRecordset : public Recordset {
class Binding : public CADORecordBinding {
BEGIN_ADO_BINDING(Binding)
ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE)
- ADO_VARIABLE_LENGTH_ENTRY4(2, adVarChar, queueName,
- sizeof(queueName), FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey,
sizeof(routingKey), FALSE)
END_ADO_BINDING()
public:
uint64_t exchangeId;
- char queueName[256];
+ uint64_t queueId;
char routingKey[256];
};
+ // Remove all records matching the specified filter/query.
+ void removeFilter(const std::string& filter);
+
public:
// Add a new binding
void add(uint64_t exchangeId,
- const std::string& queueName,
+ uint64_t queueId,
const std::string& routingKey,
const qpid::framing::FieldTable& args);
// Remove a specific binding
void remove(uint64_t exchangeId,
- const std::string& queueName,
+ uint64_t queueId,
const std::string& routingKey,
const qpid::framing::FieldTable& args);
// Remove all bindings for the specified exchange
- void remove(uint64_t exchangeId);
+ void removeForExchange(uint64_t exchangeId);
// Remove all bindings for the specified queue
- void remove(const std::string& queueName);
+ void removeForQueue(uint64_t queueId);
// Recover bindings set using exchMap to get from Id to RecoverableExchange.
void recover(qpid::broker::RecoveryManager& recoverer,
- std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap);
+ const qpid::store::ExchangeMap& exchMap,
+ const qpid::store::QueueMap& queueMap);
// Dump table contents; useful for debugging.
void dump();
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
index b8c020fabe..4c2b9892ab 100644
--- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
+++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -267,7 +267,8 @@ public:
virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
QueueMap& queueMap);
virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
- const ExchangeMap& exchangeMap);
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap);
virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap);
@@ -424,8 +425,10 @@ MSSqlProvider::destroy(PersistableQueue& queue)
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsBindings.open(db, TblBinding);
+ // Remove bindings first; the queue IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForQueue(queue.getPersistenceId());
rsQueues.remove(queue);
- rsBindings.remove(queue.getName());
db->commitTransaction();
}
catch(_com_error &e) {
@@ -468,8 +471,10 @@ MSSqlProvider::destroy(const PersistableExchange& exchange)
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsBindings.open(db, TblBinding);
+ // Remove bindings first; the exchange IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForExchange(exchange.getPersistenceId());
rsExchanges.remove(exchange);
- rsBindings.remove(exchange.getPersistenceId());
db->commitTransaction();
}
catch(_com_error &e) {
@@ -492,7 +497,10 @@ MSSqlProvider::bind(const PersistableExchange& exchange,
BindingRecordset rsBindings;
db->beginTransaction();
rsBindings.open(db, TblBinding);
- rsBindings.add(exchange.getPersistenceId(), queue.getName(), key, args);
+ rsBindings.add(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
db->commitTransaction();
}
catch(_com_error &e) {
@@ -517,7 +525,7 @@ MSSqlProvider::unbind(const PersistableExchange& exchange,
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.remove(exchange.getPersistenceId(),
- queue.getName(),
+ queue.getPersistenceId(),
key,
args);
db->commitTransaction();
@@ -888,12 +896,13 @@ MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
void
MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
- const ExchangeMap& exchangeMap)
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap)
{
DatabaseConnection *db = initConnection();
BindingRecordset rsBindings;
rsBindings.open(db, TblBinding);
- rsBindings.recover(recoverer, exchangeMap);
+ rsBindings.recover(recoverer, exchangeMap, queueMap);
}
void
@@ -947,7 +956,7 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
" fieldTableBlob varbinary(MAX) NOT NULL)";
const std::string bindingSpecs =
" (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
- " queueName varchar(255) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
" routingKey varchar(255),"
" fieldTableBlob varbinary(MAX))";
const std::string messageMapSpecs =