diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-15 10:28:11 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-15 10:28:11 +0000 |
commit | 8690d6d8c01335523a8a4b1677979ee1ce51dec0 (patch) | |
tree | 8df5afa2dcd96a57b9cbf869a7a3a672af4ba134 /cpp/src | |
parent | 71ae30ea0b7d3cb4b848ad84fb90c782894cf606 (diff) | |
download | qpid-python-8690d6d8c01335523a8a4b1677979ee1ce51dec0.tar.gz |
Added ability for broker to load a message store implementation from a library.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@475181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Configuration.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Configuration.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.cpp | 79 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.h | 55 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Module.h | 161 |
8 files changed, 311 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 74ee4df952..3f0f00377d 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -29,7 +29,8 @@ using namespace qpid::sys; Broker::Broker(const Configuration& config) : acceptor(Acceptor::create(config.getPort(), config.getConnectionBacklog(), - config.getWorkerThreads())) + config.getWorkerThreads())), + factory(config.getStore()) { } diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp index 4cc9cd3a0c..39f5c23ee6 100644 --- a/cpp/src/qpid/broker/Configuration.cpp +++ b/cpp/src/qpid/broker/Configuration.cpp @@ -30,6 +30,7 @@ Configuration::Configuration() : workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5), maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500), connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), + store('s', "store", "Sets the message store module to use (default='' which implies no store)", ""), help("help", "Prints usage information", false) { options.push_back(&trace); @@ -37,6 +38,7 @@ Configuration::Configuration() : options.push_back(&workerThreads); options.push_back(&maxConnections); options.push_back(&connectionBacklog); + options.push_back(&store); options.push_back(&help); } @@ -86,6 +88,10 @@ int Configuration::getConnectionBacklog() const { return connectionBacklog.getValue(); } +const std::string& Configuration::getStore() const { + return store.getValue(); +} + Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : flag(string("-") + _flag), name("--" +_name), desc(_desc) {} diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h index 54b604ac22..1a081764bf 100644 --- a/cpp/src/qpid/broker/Configuration.h +++ b/cpp/src/qpid/broker/Configuration.h @@ -95,6 +95,7 @@ namespace qpid { IntOption workerThreads; IntOption maxConnections; IntOption connectionBacklog; + StringOption store; BoolOption help; typedef std::vector<Option*>::iterator op_iterator; @@ -118,6 +119,7 @@ namespace qpid { int getWorkerThreads() const; int getMaxConnections() const; int getConnectionBacklog() const; + const std::string& getStore() const; void setHelp(bool b) { help.setValue(b); } void setTrace(bool b) { trace.setValue(b); } @@ -125,6 +127,7 @@ namespace qpid { void setWorkerThreads(int i) { workerThreads.setValue(i); } void setMaxConnections(int i) { maxConnections.setValue(i); } void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } + void setStore(const std::string& s) { store.setValue(s); } void usage(); }; diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp new file mode 100644 index 0000000000..f5c27ca6bc --- /dev/null +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/MessageStoreModule.h> +#include <iostream> + +using namespace qpid::broker; + +MessageStoreModule::MessageStoreModule(const std::string& name) : store(name) +{ +} + +void MessageStoreModule::create(const Queue& queue) +{ + store->create(queue); +} + +void MessageStoreModule::destroy(const Queue& queue) +{ + store->destroy(queue); +} + +void MessageStoreModule::recover(QueueRegistry& registry) +{ + store->recover(registry); +} + +void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) +{ + store->enqueue(ctxt, msg, queue, xid); +} + +void MessageStoreModule::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) +{ + store->dequeue(ctxt, msg, queue, xid); +} + +void MessageStoreModule::committed(const string * const xid) +{ + store->committed(xid); +} + +void MessageStoreModule::aborted(const string * const xid) +{ + store->aborted(xid); +} + +std::auto_ptr<TransactionContext> MessageStoreModule::begin() +{ + return store->begin(); +} + +void MessageStoreModule::commit(TransactionContext* ctxt) +{ + store->commit(ctxt); +} + +void MessageStoreModule::abort(TransactionContext* ctxt) +{ + store->abort(ctxt); +} diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h new file mode 100644 index 0000000000..490cfbb5c7 --- /dev/null +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MessageStoreModule_ +#define _MessageStoreModule_ + +#include <qpid/broker/Message.h> +#include <qpid/broker/MessageStore.h> +#include <qpid/broker/Queue.h> +#include <qpid/broker/QueueRegistry.h> +#include <qpid/sys/Module.h> + +namespace qpid { + namespace broker { + /** + * A null implementation of the MessageStore interface + */ + class MessageStoreModule : public MessageStore{ + qpid::sys::Module<MessageStore> store; + public: + MessageStoreModule(const std::string& name); + void create(const Queue& queue); + void destroy(const Queue& queue); + void recover(QueueRegistry& queues); + void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void committed(const string * const xid); + void aborted(const string * const xid); + std::auto_ptr<TransactionContext> begin(); + void commit(TransactionContext* ctxt); + void abort(TransactionContext* ctxt); + ~MessageStoreModule(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp index c5f17c006a..5b7bb1ff5e 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp @@ -23,6 +23,7 @@ #include <qpid/broker/DirectExchange.h> #include <qpid/broker/FanOutExchange.h> #include <qpid/broker/HeadersExchange.h> +#include <qpid/broker/MessageStoreModule.h> #include <qpid/broker/NullMessageStore.h> #include <qpid/broker/SessionHandlerImpl.h> @@ -38,8 +39,9 @@ const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); } -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : - store(new NullMessageStore()), queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) : + store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), + queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) { exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h index 9575656018..3703efcf89 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h @@ -44,7 +44,7 @@ namespace qpid { const u_int32_t timeout;//timeout for auto-deleted queues (in ms) AutoDelete cleaner; public: - SessionHandlerFactoryImpl(u_int32_t timeout = 30000); + SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000); void recover(); virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); virtual ~SessionHandlerFactoryImpl(); diff --git a/cpp/src/qpid/sys/Module.h b/cpp/src/qpid/sys/Module.h new file mode 100644 index 0000000000..3093eef074 --- /dev/null +++ b/cpp/src/qpid/sys/Module.h @@ -0,0 +1,161 @@ +#ifndef _sys_Module_h +#define _sys_Module_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/noncopyable.hpp> +#include <iostream> +#include <qpid/QpidError.h> + +namespace qpid { +namespace sys { +#if USE_APR +#include <apr-1/apr_dso.h> + typedef apr_dso_handle_t* dso_handle_t; +#else + typedef void* dso_handle_t; +#endif + + template <class T> class Module : private boost::noncopyable + { + typedef T* create_t(); + typedef void destroy_t(T*); + + dso_handle_t handle; + destroy_t* destroy; + T* ptr; + + void load(const std::string& name); + void unload(); + void* getSymbol(const std::string& name); + + public: + Module(const std::string& name); + T* operator->(); + T* get(); + ~Module() throw(); + }; + +} +} + +using namespace qpid::sys; + +template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0) +{ + load(module); + //TODO: need a better strategy for symbol names to allow multiple + //modules to be loaded without clashes... + + //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic + create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); + destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); + ptr = create(); +} + +template <class T> T* Module<T>::operator->() +{ + return ptr; +} + +template <class T> T* Module<T>::get() +{ + return ptr; +} + +template <class T> Module<T>::~Module() throw() +{ + try { + if (handle && ptr) { + destroy(ptr); + } + if (handle) unload(); + } catch (std::exception& e) { + std::cout << "Error while destroying module: " << e.what() << std::endl; + } + destroy = 0; + handle = 0; + ptr = 0; +} + +// APR ================================================================ +#if USE_APR + +#include <qpid/apr/APRBase.h> +#include <qpid/apr/APRPool.h> + +template <class T> void Module<T>::load(const std::string& name) +{ + CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get())); +} + +template <class T> void Module<T>::unload() +{ + CHECK_APR_SUCCESS(apr_dso_unload(handle)); +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ + apr_dso_handle_sym_t symbol; + CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str())); + return (void*) symbol; +} + +// POSIX================================================================ +#else + +#include <dlfcn.h> + +template <class T> void Module<T>::load(const std::string& name) +{ + dlerror(); + handle = dlopen(name.c_str(), RTLD_NOW); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } +} + +template <class T> void Module<T>::unload() +{ + dlerror(); + dlclose(handle); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } +} + +template <class T> void* Module<T>::getSymbol(const std::string& name) +{ + dlerror(); + void* sym = dlsym(handle, name.c_str()); + const char* error = dlerror(); + if (error) { + THROW_QPID_ERROR(INTERNAL_ERROR, error); + } + return sym; +} + +#endif //if USE_APR + +#endif //ifndef _sys_Module_h + |