diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-20 11:26:45 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-20 11:26:45 +0000 |
commit | aa0e730b43ede5532a238ad209d627e5b9e6abc9 (patch) | |
tree | 6e62920f30466d581bb5cae16299c01a23268589 | |
parent | eedd0b960ed0b727ea1f4fe35ff0ea947982aa3d (diff) | |
download | qpid-python-aa0e730b43ede5532a238ad209d627e5b9e6abc9.tar.gz |
Modifications to recovery process to allow for recovery of default bindings.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@477148 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManager.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManager.h | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp | 5 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxAckTest.cpp | 4 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxPublishTest.cpp | 4 |
11 files changed, 103 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index d477cd6c38..3acada0f82 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -22,14 +22,11 @@ #define _MessageStore_ #include <qpid/broker/Message.h> -#include <qpid/broker/Queue.h> +#include <qpid/broker/RecoveryManager.h> #include <qpid/broker/TransactionalStore.h> namespace qpid { namespace broker { - class Queue; - class QueueRegistry; - /** * An abstraction of the persistent storage for messages. */ @@ -47,7 +44,7 @@ namespace qpid { /** * Request recovery of queue and message state from store */ - virtual void recover(QueueRegistry& queues) = 0; + virtual void recover(RecoveryManager& queues) = 0; /** * Enqueues a message, storing the message if it has not diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index f5c27ca6bc..1f26807f54 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -38,7 +38,7 @@ void MessageStoreModule::destroy(const Queue& queue) store->destroy(queue); } -void MessageStoreModule::recover(QueueRegistry& registry) +void MessageStoreModule::recover(RecoveryManager& registry) { store->recover(registry); } diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 490cfbb5c7..29b62ccfa2 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -24,7 +24,7 @@ #include <qpid/broker/Message.h> #include <qpid/broker/MessageStore.h> #include <qpid/broker/Queue.h> -#include <qpid/broker/QueueRegistry.h> +#include <qpid/broker/RecoveryManager.h> #include <qpid/sys/Module.h> namespace qpid { @@ -38,7 +38,7 @@ namespace qpid { MessageStoreModule(const std::string& name); void create(const Queue& queue); void destroy(const Queue& queue); - void recover(QueueRegistry& queues); + void recover(RecoveryManager& queues); void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void committed(const string * const xid); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index aa58849b34..7bc95225b4 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -22,7 +22,7 @@ #include <qpid/broker/NullMessageStore.h> #include <qpid/broker/Queue.h> -#include <qpid/broker/QueueRegistry.h> +#include <qpid/broker/RecoveryManager.h> #include <iostream> @@ -34,7 +34,7 @@ void NullMessageStore::create(const Queue& queue){ void NullMessageStore::destroy(const Queue& queue){ std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::recover(QueueRegistry&){ +void NullMessageStore::recover(RecoveryManager&){ std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; } void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const){ diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 0268a5419d..7916467091 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -27,8 +27,6 @@ namespace qpid { namespace broker { - class Queue; - class QueueRegistry; /** * A null implementation of the MessageStore interface @@ -37,7 +35,7 @@ namespace qpid { public: void create(const Queue& queue); void destroy(const Queue& queue); - void recover(QueueRegistry& queues); + void recover(RecoveryManager& queues); void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void committed(const string * const xid); diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 2ad56b0405..f95015c0fd 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -28,8 +28,6 @@ namespace qpid { namespace broker { -class SessionHandlerImpl; - /** * A registry of queues indexed by queue name. * diff --git a/cpp/src/qpid/broker/RecoveryManager.cpp b/cpp/src/qpid/broker/RecoveryManager.cpp new file mode 100644 index 0000000000..40275477b7 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveryManager.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/broker/RecoveryManager.h> + +using namespace qpid::broker; + +RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {} + +RecoveryManager::~RecoveryManager() {} + +Queue::shared_ptr RecoveryManager::recoverQueue(const string& name) +{ + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); + Exchange::shared_ptr exchange = exchanges.getDefault(); + if (exchange) { + exchange->bind(result.first, result.first->getName(), 0); + } + return result.first; +} + +Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type) +{ + return exchanges.declare(name, type).first; +} diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h new file mode 100644 index 0000000000..64fb11fe11 --- /dev/null +++ b/cpp/src/qpid/broker/RecoveryManager.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _RecoveryManager_ +#define _RecoveryManager_ + +#include <qpid/broker/ExchangeRegistry.h> +#include <qpid/broker/QueueRegistry.h> + +namespace qpid { +namespace broker { + + class RecoveryManager{ + QueueRegistry& queues; + ExchangeRegistry& exchanges; + public: + RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges); + ~RecoveryManager(); + Queue::shared_ptr recoverQueue(const string& name); + Exchange::shared_ptr recoverExchange(const string& name, const string& type); + }; + + +} +} + + +#endif diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp index 57fff37653..75912c6560 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp @@ -49,7 +49,10 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, exchanges.declare(amq_fanout, FanOutExchange::typeName); exchanges.declare(amq_match, HeadersExchange::typeName); - if(store.get()) store->recover(queues); + if(store.get()) { + RecoveryManager recoverer(queues, exchanges); + store->recover(recoverer); + } cleaner.start(); } diff --git a/cpp/test/unit/qpid/broker/TxAckTest.cpp b/cpp/test/unit/qpid/broker/TxAckTest.cpp index 47527c27ce..91f0bd1498 100644 --- a/cpp/test/unit/qpid/broker/TxAckTest.cpp +++ b/cpp/test/unit/qpid/broker/TxAckTest.cpp @@ -19,7 +19,7 @@ * */ #include <qpid/broker/MessageStore.h> -#include <qpid/broker/QueueRegistry.h> +#include <qpid/broker/RecoveryManager.h> #include <qpid/broker/TxAck.h> #include <qpid_test_plugin.h> #include <iostream> @@ -47,7 +47,7 @@ class TxAckTest : public CppUnit::TestCase //dont care about any of the other methods: void create(const Queue&){} void destroy(const Queue&){} - void recover(QueueRegistry&){} + void recover(RecoveryManager&){} void enqueue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){} void committed(const string * const){} void aborted(const string * const){} diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp index 89287e6df3..a28d1127de 100644 --- a/cpp/test/unit/qpid/broker/TxPublishTest.cpp +++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp @@ -19,7 +19,7 @@ * */ #include <qpid/broker/MessageStore.h> -#include <qpid/broker/QueueRegistry.h> +#include <qpid/broker/RecoveryManager.h> #include <qpid/broker/TxPublish.h> #include <qpid_test_plugin.h> #include <iostream> @@ -48,7 +48,7 @@ class TxPublishTest : public CppUnit::TestCase //dont care about any of the other methods: void create(const Queue&){} void destroy(const Queue&){} - void recover(QueueRegistry&){} + void recover(RecoveryManager&){} void dequeue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){} void committed(const string * const){} void aborted(const string * const){} |