summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/MessageStore.h7
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h4
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp4
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h4
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h2
-rw-r--r--cpp/src/qpid/broker/RecoveryManager.cpp42
-rw-r--r--cpp/src/qpid/broker/RecoveryManager.h45
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp5
-rw-r--r--cpp/test/unit/qpid/broker/TxAckTest.cpp4
-rw-r--r--cpp/test/unit/qpid/broker/TxPublishTest.cpp4
11 files changed, 103 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index d477cd6c38..3acada0f82 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -22,14 +22,11 @@
#define _MessageStore_
#include <qpid/broker/Message.h>
-#include <qpid/broker/Queue.h>
+#include <qpid/broker/RecoveryManager.h>
#include <qpid/broker/TransactionalStore.h>
namespace qpid {
namespace broker {
- class Queue;
- class QueueRegistry;
-
/**
* An abstraction of the persistent storage for messages.
*/
@@ -47,7 +44,7 @@ namespace qpid {
/**
* Request recovery of queue and message state from store
*/
- virtual void recover(QueueRegistry& queues) = 0;
+ virtual void recover(RecoveryManager& queues) = 0;
/**
* Enqueues a message, storing the message if it has not
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index f5c27ca6bc..1f26807f54 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -38,7 +38,7 @@ void MessageStoreModule::destroy(const Queue& queue)
store->destroy(queue);
}
-void MessageStoreModule::recover(QueueRegistry& registry)
+void MessageStoreModule::recover(RecoveryManager& registry)
{
store->recover(registry);
}
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 490cfbb5c7..29b62ccfa2 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -24,7 +24,7 @@
#include <qpid/broker/Message.h>
#include <qpid/broker/MessageStore.h>
#include <qpid/broker/Queue.h>
-#include <qpid/broker/QueueRegistry.h>
+#include <qpid/broker/RecoveryManager.h>
#include <qpid/sys/Module.h>
namespace qpid {
@@ -38,7 +38,7 @@ namespace qpid {
MessageStoreModule(const std::string& name);
void create(const Queue& queue);
void destroy(const Queue& queue);
- void recover(QueueRegistry& queues);
+ void recover(RecoveryManager& queues);
void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void committed(const string * const xid);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index aa58849b34..7bc95225b4 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -22,7 +22,7 @@
#include <qpid/broker/NullMessageStore.h>
#include <qpid/broker/Queue.h>
-#include <qpid/broker/QueueRegistry.h>
+#include <qpid/broker/RecoveryManager.h>
#include <iostream>
@@ -34,7 +34,7 @@ void NullMessageStore::create(const Queue& queue){
void NullMessageStore::destroy(const Queue& queue){
std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::recover(QueueRegistry&){
+void NullMessageStore::recover(RecoveryManager&){
std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
}
void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const){
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 0268a5419d..7916467091 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -27,8 +27,6 @@
namespace qpid {
namespace broker {
- class Queue;
- class QueueRegistry;
/**
* A null implementation of the MessageStore interface
@@ -37,7 +35,7 @@ namespace qpid {
public:
void create(const Queue& queue);
void destroy(const Queue& queue);
- void recover(QueueRegistry& queues);
+ void recover(RecoveryManager& queues);
void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void committed(const string * const xid);
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 2ad56b0405..f95015c0fd 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -28,8 +28,6 @@
namespace qpid {
namespace broker {
-class SessionHandlerImpl;
-
/**
* A registry of queues indexed by queue name.
*
diff --git a/cpp/src/qpid/broker/RecoveryManager.cpp b/cpp/src/qpid/broker/RecoveryManager.cpp
new file mode 100644
index 0000000000..40275477b7
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoveryManager.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/broker/RecoveryManager.h>
+
+using namespace qpid::broker;
+
+RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {}
+
+RecoveryManager::~RecoveryManager() {}
+
+Queue::shared_ptr RecoveryManager::recoverQueue(const string& name)
+{
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ Exchange::shared_ptr exchange = exchanges.getDefault();
+ if (exchange) {
+ exchange->bind(result.first, result.first->getName(), 0);
+ }
+ return result.first;
+}
+
+Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type)
+{
+ return exchanges.declare(name, type).first;
+}
diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h
new file mode 100644
index 0000000000..64fb11fe11
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoveryManager.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveryManager_
+#define _RecoveryManager_
+
+#include <qpid/broker/ExchangeRegistry.h>
+#include <qpid/broker/QueueRegistry.h>
+
+namespace qpid {
+namespace broker {
+
+ class RecoveryManager{
+ QueueRegistry& queues;
+ ExchangeRegistry& exchanges;
+ public:
+ RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges);
+ ~RecoveryManager();
+ Queue::shared_ptr recoverQueue(const string& name);
+ Exchange::shared_ptr recoverExchange(const string& name, const string& type);
+ };
+
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
index 57fff37653..75912c6560 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
@@ -49,7 +49,10 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store,
exchanges.declare(amq_fanout, FanOutExchange::typeName);
exchanges.declare(amq_match, HeadersExchange::typeName);
- if(store.get()) store->recover(queues);
+ if(store.get()) {
+ RecoveryManager recoverer(queues, exchanges);
+ store->recover(recoverer);
+ }
cleaner.start();
}
diff --git a/cpp/test/unit/qpid/broker/TxAckTest.cpp b/cpp/test/unit/qpid/broker/TxAckTest.cpp
index 47527c27ce..91f0bd1498 100644
--- a/cpp/test/unit/qpid/broker/TxAckTest.cpp
+++ b/cpp/test/unit/qpid/broker/TxAckTest.cpp
@@ -19,7 +19,7 @@
*
*/
#include <qpid/broker/MessageStore.h>
-#include <qpid/broker/QueueRegistry.h>
+#include <qpid/broker/RecoveryManager.h>
#include <qpid/broker/TxAck.h>
#include <qpid_test_plugin.h>
#include <iostream>
@@ -47,7 +47,7 @@ class TxAckTest : public CppUnit::TestCase
//dont care about any of the other methods:
void create(const Queue&){}
void destroy(const Queue&){}
- void recover(QueueRegistry&){}
+ void recover(RecoveryManager&){}
void enqueue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){}
void committed(const string * const){}
void aborted(const string * const){}
diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp
index 89287e6df3..a28d1127de 100644
--- a/cpp/test/unit/qpid/broker/TxPublishTest.cpp
+++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp
@@ -19,7 +19,7 @@
*
*/
#include <qpid/broker/MessageStore.h>
-#include <qpid/broker/QueueRegistry.h>
+#include <qpid/broker/RecoveryManager.h>
#include <qpid/broker/TxPublish.h>
#include <qpid_test_plugin.h>
#include <iostream>
@@ -48,7 +48,7 @@ class TxPublishTest : public CppUnit::TestCase
//dont care about any of the other methods:
void create(const Queue&){}
void destroy(const Queue&){}
- void recover(QueueRegistry&){}
+ void recover(RecoveryManager&){}
void dequeue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){}
void committed(const string * const){}
void aborted(const string * const){}