summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp')
-rw-r--r--cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp58
1 files changed, 39 insertions, 19 deletions
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
index e06f8a750f..a26df59df7 100644
--- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
+++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp
@@ -399,8 +399,8 @@ MSSqlProvider::create(PersistableQueue& queue,
const qpid::framing::FieldTable& /*args needed for jrnl*/)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
try {
- BlobRecordset rsQueues;
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsQueues.add(queue);
@@ -418,16 +418,36 @@ MSSqlProvider::create(PersistableQueue& queue,
void
MSSqlProvider::destroy(PersistableQueue& queue)
{
+ // MessageDeleter class for use with for_each, below.
+ class MessageDeleter {
+ BlobRecordset& msgs;
+ public:
+ explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {}
+ void operator()(uint64_t msgId) { msgs.remove(msgId); }
+ };
+
DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ BindingRecordset rsBindings;
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMessageMaps;
try {
- BlobRecordset rsQueues;
- BindingRecordset rsBindings;
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsBindings.open(db, TblBinding);
+ rsMessages.open(db, TblMessage);
+ rsMessageMaps.open(db, TblMessageMap);
// Remove bindings first; the queue IDs can't be ripped out from
- // under the references in the bindings table.
+ // under the references in the bindings table. Then remove the
+ // message->queue entries for the queue, also because the queue can't
+ // be deleted while there are references to it. If there are messages
+ // orphaned by removing the queue references, those messages can
+ // also be deleted. Lastly, the queue record can be removed.
rsBindings.removeForQueue(queue.getPersistenceId());
+ std::vector<uint64_t> orphans;
+ rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans);
+ std::for_each(orphans.begin(), orphans.end(),
+ MessageDeleter(rsMessages));
rsQueues.remove(queue);
db->commitTransaction();
}
@@ -445,8 +465,8 @@ MSSqlProvider::create(const PersistableExchange& exchange,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
try {
- BlobRecordset rsExchanges;
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsExchanges.add(exchange);
@@ -465,9 +485,9 @@ void
MSSqlProvider::destroy(const PersistableExchange& exchange)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ BindingRecordset rsBindings;
try {
- BlobRecordset rsExchanges;
- BindingRecordset rsBindings;
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsBindings.open(db, TblBinding);
@@ -493,8 +513,8 @@ MSSqlProvider::bind(const PersistableExchange& exchange,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
try {
- BindingRecordset rsBindings;
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.add(exchange.getPersistenceId(),
@@ -520,8 +540,8 @@ MSSqlProvider::unbind(const PersistableExchange& exchange,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
try {
- BindingRecordset rsBindings;
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.remove(exchange.getPersistenceId(),
@@ -544,8 +564,8 @@ void
MSSqlProvider::create(const PersistableConfig& config)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
try {
- BlobRecordset rsConfigs;
db->beginTransaction();
rsConfigs.open(db, TblConfig);
rsConfigs.add(config);
@@ -564,8 +584,8 @@ void
MSSqlProvider::destroy(const PersistableConfig& config)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
try {
- BlobRecordset rsConfigs;
db->beginTransaction();
rsConfigs.open(db, TblConfig);
rsConfigs.remove(config);
@@ -590,8 +610,8 @@ void
MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
{
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.add(msg);
@@ -613,8 +633,8 @@ void
MSSqlProvider::destroy(PersistableMessage& msg)
{
DatabaseConnection *db = initConnection();
+ BlobRecordset rsMessages;
try {
- BlobRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
@@ -634,8 +654,8 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage
const std::string& data)
{
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.append(msg, data);
@@ -665,8 +685,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
// SQL store keeps all messages in one table, so we don't need the
// queue reference.
DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
try {
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.loadContent(msg, data, offset, length);
}
@@ -710,13 +730,13 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
}
}
+ MessageRecordset rsMessages;
+ MessageMapRecordset rsMap;
try {
if (msg->getPersistenceId() == 0) { // Message itself not yet saved
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.add(msg);
}
- MessageMapRecordset rsMap;
rsMap.open(db, TblMessageMap);
rsMap.add(msg->getPersistenceId(), queue.getPersistenceId());
if (atxn)
@@ -769,13 +789,13 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
}
}
+ MessageMapRecordset rsMap;
+ MessageRecordset rsMessages;
try {
- MessageMapRecordset rsMap;
rsMap.open(db, TblMessageMap);
bool more = rsMap.remove(msg->getPersistenceId(),
queue.getPersistenceId());
if (!more) {
- MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
}