summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/store
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-10-30 22:30:34 +0000
committerStephen D. Huston <shuston@apache.org>2009-10-30 22:30:34 +0000
commitd2268eb77c2ed16f22be333ce2f7cf11027f9a9b (patch)
tree5f90d9dfa5e040ada4e48e26d1f73728af02e54b /cpp/src/qpid/store
parent6fa6fd99a004c98533f533dc52933c3cbc3c2674 (diff)
downloadqpid-python-d2268eb77c2ed16f22be333ce2f7cf11027f9a9b.tar.gz
In broker, change RecoverableExchange[Impl]::bind() strings to const; they are never changed, and are only passed on to other methods that expect const strings. Makes it easier to work with from store.
In SQL store, fix the deletion of multiple bindings at once. Also, change the tblBinding table to store queue persistenceIds instead of names. Helps keep referential integrity with the queue table. Restoring bindings then just needs to look up the queue name from its ID before restoring each binding. Fixes QPID-2169, and probably QPID-2170. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@831476 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/store')
-rw-r--r--cpp/src/qpid/store/MessageStorePlugin.cpp2
-rw-r--r--cpp/src/qpid/store/StorageProvider.h3
-rw-r--r--cpp/src/qpid/store/ms-sql/BindingRecordset.cpp89
-rw-r--r--cpp/src/qpid/store/ms-sql/BindingRecordset.h20
-rw-r--r--cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp25
5 files changed, 84 insertions, 55 deletions
diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp
index 6720fd13f3..05b6ef4465 100644
--- a/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -409,7 +409,7 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
provider->second->recoverConfigs(recoverer);
provider->second->recoverExchanges(recoverer, exchanges);
provider->second->recoverQueues(recoverer, queues);
- provider->second->recoverBindings(recoverer, exchanges);
+ provider->second->recoverBindings(recoverer, exchanges, queues);
provider->second->recoverMessages(recoverer, messages, messageQueueMap);
// Enqueue msgs where needed.
for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
diff --git a/cpp/src/qpid/store/StorageProvider.h b/cpp/src/qpid/store/StorageProvider.h
index 5dc10ecf4a..1f257e7416 100644
--- a/cpp/src/qpid/store/StorageProvider.h
+++ b/cpp/src/qpid/store/StorageProvider.h
@@ -311,7 +311,8 @@ public:
virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
QueueMap& queueMap) = 0;
virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
- const ExchangeMap& exchangeMap) = 0;
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap) = 0;
virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap) = 0;
diff --git a/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp b/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
index 737d93b142..1dc4370312 100644
--- a/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
+++ b/cpp/src/qpid/store/ms-sql/BindingRecordset.cpp
@@ -32,17 +32,32 @@ namespace store {
namespace ms_sql {
void
+BindingRecordset::removeFilter(const std::string& filter)
+{
+ rs->PutFilter (VariantHelper<std::string>(filter));
+ long recs = rs->GetRecordCount();
+ if (recs == 0)
+ return; // Nothing to do
+ while (recs > 0) {
+ // Deleting adAffectAll doesn't work as documented; go one by one.
+ rs->Delete(adAffectCurrent);
+ if (--recs > 0)
+ rs->MoveNext();
+ }
+ rs->Update();
+}
+
+void
BindingRecordset::add(uint64_t exchangeId,
- const std::string& queueName,
+ uint64_t queueId,
const std::string& routingKey,
const qpid::framing::FieldTable& args)
{
- VariantHelper<std::string> queueNameStr(queueName);
VariantHelper<std::string> routingKeyStr(routingKey);
BlobEncoder blob (args); // Marshall field table to a blob
rs->AddNew();
rs->Fields->GetItem("exchangeId")->Value = exchangeId;
- rs->Fields->GetItem("queueName")->Value = queueNameStr;
+ rs->Fields->GetItem("queueId")->Value = queueId;
rs->Fields->GetItem("routingKey")->Value = routingKeyStr;
rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
rs->Update();
@@ -50,57 +65,40 @@ BindingRecordset::add(uint64_t exchangeId,
void
BindingRecordset::remove(uint64_t exchangeId,
- const std::string& queueName,
+ uint64_t queueId,
const std::string& routingKey,
const qpid::framing::FieldTable& /*args*/)
{
// Look up the affected binding.
std::ostringstream filter;
filter << "exchangeId = " << exchangeId
- << " AND queueName = '" << queueName << "'"
+ << " AND queueId = " << queueId
<< " AND routingKey = '" << routingKey << "'" << std::ends;
- rs->PutFilter (VariantHelper<std::string>(filter.str()));
- if (rs->RecordCount != 0) {
- // Delete the records
- rs->Delete(adAffectGroup);
- rs->Update();
- }
- requery();
+ removeFilter(filter.str());
}
void
-BindingRecordset::remove(uint64_t exchangeId)
+BindingRecordset::removeForExchange(uint64_t exchangeId)
{
// Look up the affected bindings by the exchange ID
std::ostringstream filter;
filter << "exchangeId = " << exchangeId << std::ends;
- rs->PutFilter (VariantHelper<std::string>(filter.str()));
- if (rs->RecordCount != 0) {
- // Delete the records
- rs->Delete(adAffectGroup);
- rs->Update();
- }
- requery();
+ removeFilter(filter.str());
}
void
-BindingRecordset::remove(const std::string& queueName)
+BindingRecordset::removeForQueue(uint64_t queueId)
{
- // Look up the affected bindings by the exchange ID
+ // Look up the affected bindings by the queue ID
std::ostringstream filter;
- filter << "queueName = '" << queueName << "'" << std::ends;
- rs->PutFilter (VariantHelper<std::string>(filter.str()));
- if (rs->RecordCount != 0) {
- // Delete the records
- rs->Delete(adAffectGroup);
- rs->Update();
- }
- requery();
+ filter << "queueId = " << queueId << std::ends;
+ removeFilter(filter.str());
}
void
-BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer,
- std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap)
+BindingRecordset::recover(broker::RecoveryManager& recoverer,
+ const store::ExchangeMap& exchMap,
+ const store::QueueMap& queueMap)
{
if (rs->BOF && rs->EndOfFile)
return; // Nothing to do
@@ -114,9 +112,26 @@ BindingRecordset::recover(qpid::broker::RecoveryManager& recoverer,
long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
BlobAdapter blob(blobSize);
blob = rs->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
- broker::RecoverableExchange::shared_ptr exch = exchMap[b.exchangeId];
- std::string q(b.queueName), k(b.routingKey);
- exch->bind(q, k, blob);
+ store::ExchangeMap::const_iterator exch = exchMap.find(b.exchangeId);
+ if (exch == exchMap.end()) {
+ std::ostringstream msg;
+ msg << "Error recovering bindings; exchange ID " << b.exchangeId
+ << " not found in exchange map";
+ throw qpid::Exception(msg.str());
+ }
+ broker::RecoverableExchange::shared_ptr exchPtr = exch->second;
+ store::QueueMap::const_iterator q = queueMap.find(b.queueId);
+ if (q == queueMap.end()) {
+ std::ostringstream msg;
+ msg << "Error recovering bindings; queue ID " << b.queueId
+ << " not found in queue map";
+ throw qpid::Exception(msg.str());
+ }
+ broker::RecoverableQueue::shared_ptr qPtr = q->second;
+ // The recovery manager wants the queue name, so get it from the
+ // RecoverableQueue.
+ std::string key(b.routingKey);
+ exchPtr->bind(qPtr->getName(), key, blob);
rs->MoveNext();
}
@@ -138,8 +153,8 @@ BindingRecordset::dump()
piAdoRecordBinding->BindToRecordset(&b);
while (VARIANT_FALSE == rs->EndOfFile) {
- QPID_LOG(notice, "exch " << b.exchangeId
- << ", q: " << b.queueName
+ QPID_LOG(notice, "exch Id " << b.exchangeId
+ << ", q Id " << b.queueId
<< ", k: " << b.routingKey);
rs->MoveNext();
}
diff --git a/cpp/src/qpid/store/ms-sql/BindingRecordset.h b/cpp/src/qpid/store/ms-sql/BindingRecordset.h
index 5c51636f4f..3cb732de75 100644
--- a/cpp/src/qpid/store/ms-sql/BindingRecordset.h
+++ b/cpp/src/qpid/store/ms-sql/BindingRecordset.h
@@ -24,6 +24,7 @@
#include <icrsint.h>
#include "Recordset.h"
+#include <qpid/store/StorageProvider.h>
#include <qpid/broker/RecoveryManager.h>
namespace qpid {
@@ -40,40 +41,43 @@ class BindingRecordset : public Recordset {
class Binding : public CADORecordBinding {
BEGIN_ADO_BINDING(Binding)
ADO_FIXED_LENGTH_ENTRY2(1, adBigInt, exchangeId, FALSE)
- ADO_VARIABLE_LENGTH_ENTRY4(2, adVarChar, queueName,
- sizeof(queueName), FALSE)
+ ADO_FIXED_LENGTH_ENTRY2(2, adBigInt, queueId, FALSE)
ADO_VARIABLE_LENGTH_ENTRY4(3, adVarChar, routingKey,
sizeof(routingKey), FALSE)
END_ADO_BINDING()
public:
uint64_t exchangeId;
- char queueName[256];
+ uint64_t queueId;
char routingKey[256];
};
+ // Remove all records matching the specified filter/query.
+ void removeFilter(const std::string& filter);
+
public:
// Add a new binding
void add(uint64_t exchangeId,
- const std::string& queueName,
+ uint64_t queueId,
const std::string& routingKey,
const qpid::framing::FieldTable& args);
// Remove a specific binding
void remove(uint64_t exchangeId,
- const std::string& queueName,
+ uint64_t queueId,
const std::string& routingKey,
const qpid::framing::FieldTable& args);
// Remove all bindings for the specified exchange
- void remove(uint64_t exchangeId);
+ void removeForExchange(uint64_t exchangeId);
// Remove all bindings for the specified queue
- void remove(const std::string& queueName);
+ void removeForQueue(uint64_t queueId);
// Recover bindings set using exchMap to get from Id to RecoverableExchange.
void recover(qpid::broker::RecoveryManager& recoverer,
- std::map<uint64_t, broker::RecoverableExchange::shared_ptr> exchMap);
+ const qpid::store::ExchangeMap& exchMap,
+ const qpid::store::QueueMap& queueMap);
// Dump table contents; useful for debugging.
void dump();
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
index b8c020fabe..4c2b9892ab 100644
--- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
+++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -267,7 +267,8 @@ public:
virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
QueueMap& queueMap);
virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
- const ExchangeMap& exchangeMap);
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap);
virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap);
@@ -424,8 +425,10 @@ MSSqlProvider::destroy(PersistableQueue& queue)
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsBindings.open(db, TblBinding);
+ // Remove bindings first; the queue IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForQueue(queue.getPersistenceId());
rsQueues.remove(queue);
- rsBindings.remove(queue.getName());
db->commitTransaction();
}
catch(_com_error &e) {
@@ -468,8 +471,10 @@ MSSqlProvider::destroy(const PersistableExchange& exchange)
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsBindings.open(db, TblBinding);
+ // Remove bindings first; the exchange IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForExchange(exchange.getPersistenceId());
rsExchanges.remove(exchange);
- rsBindings.remove(exchange.getPersistenceId());
db->commitTransaction();
}
catch(_com_error &e) {
@@ -492,7 +497,10 @@ MSSqlProvider::bind(const PersistableExchange& exchange,
BindingRecordset rsBindings;
db->beginTransaction();
rsBindings.open(db, TblBinding);
- rsBindings.add(exchange.getPersistenceId(), queue.getName(), key, args);
+ rsBindings.add(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
db->commitTransaction();
}
catch(_com_error &e) {
@@ -517,7 +525,7 @@ MSSqlProvider::unbind(const PersistableExchange& exchange,
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.remove(exchange.getPersistenceId(),
- queue.getName(),
+ queue.getPersistenceId(),
key,
args);
db->commitTransaction();
@@ -888,12 +896,13 @@ MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
void
MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
- const ExchangeMap& exchangeMap)
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap)
{
DatabaseConnection *db = initConnection();
BindingRecordset rsBindings;
rsBindings.open(db, TblBinding);
- rsBindings.recover(recoverer, exchangeMap);
+ rsBindings.recover(recoverer, exchangeMap, queueMap);
}
void
@@ -947,7 +956,7 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name)
" fieldTableBlob varbinary(MAX) NOT NULL)";
const std::string bindingSpecs =
" (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
- " queueName varchar(255) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
" routingKey varchar(255),"
" fieldTableBlob varbinary(MAX))";
const std::string messageMapSpecs =