summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-15 10:28:11 +0000
committerGordon Sim <gsim@apache.org>2006-11-15 10:28:11 +0000
commit8690d6d8c01335523a8a4b1677979ee1ce51dec0 (patch)
tree8df5afa2dcd96a57b9cbf869a7a3a672af4ba134 /cpp/src
parent71ae30ea0b7d3cb4b848ad84fb90c782894cf606 (diff)
downloadqpid-python-8690d6d8c01335523a8a4b1677979ee1ce51dec0.tar.gz
Added ability for broker to load a message store implementation from a library.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@475181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--cpp/src/qpid/broker/Configuration.cpp6
-rw-r--r--cpp/src/qpid/broker/Configuration.h3
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp79
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h55
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp6
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.h2
-rw-r--r--cpp/src/qpid/sys/Module.h161
8 files changed, 311 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 74ee4df952..3f0f00377d 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -29,7 +29,8 @@ using namespace qpid::sys;
Broker::Broker(const Configuration& config) :
acceptor(Acceptor::create(config.getPort(),
config.getConnectionBacklog(),
- config.getWorkerThreads()))
+ config.getWorkerThreads())),
+ factory(config.getStore())
{ }
diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp
index 4cc9cd3a0c..39f5c23ee6 100644
--- a/cpp/src/qpid/broker/Configuration.cpp
+++ b/cpp/src/qpid/broker/Configuration.cpp
@@ -30,6 +30,7 @@ Configuration::Configuration() :
workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5),
maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500),
connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
+ store('s', "store", "Sets the message store module to use (default='' which implies no store)", ""),
help("help", "Prints usage information", false)
{
options.push_back(&trace);
@@ -37,6 +38,7 @@ Configuration::Configuration() :
options.push_back(&workerThreads);
options.push_back(&maxConnections);
options.push_back(&connectionBacklog);
+ options.push_back(&store);
options.push_back(&help);
}
@@ -86,6 +88,10 @@ int Configuration::getConnectionBacklog() const {
return connectionBacklog.getValue();
}
+const std::string& Configuration::getStore() const {
+ return store.getValue();
+}
+
Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h
index 54b604ac22..1a081764bf 100644
--- a/cpp/src/qpid/broker/Configuration.h
+++ b/cpp/src/qpid/broker/Configuration.h
@@ -95,6 +95,7 @@ namespace qpid {
IntOption workerThreads;
IntOption maxConnections;
IntOption connectionBacklog;
+ StringOption store;
BoolOption help;
typedef std::vector<Option*>::iterator op_iterator;
@@ -118,6 +119,7 @@ namespace qpid {
int getWorkerThreads() const;
int getMaxConnections() const;
int getConnectionBacklog() const;
+ const std::string& getStore() const;
void setHelp(bool b) { help.setValue(b); }
void setTrace(bool b) { trace.setValue(b); }
@@ -125,6 +127,7 @@ namespace qpid {
void setWorkerThreads(int i) { workerThreads.setValue(i); }
void setMaxConnections(int i) { maxConnections.setValue(i); }
void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
+ void setStore(const std::string& s) { store.setValue(s); }
void usage();
};
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
new file mode 100644
index 0000000000..f5c27ca6bc
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/MessageStoreModule.h>
+#include <iostream>
+
+using namespace qpid::broker;
+
+MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
+{
+}
+
+void MessageStoreModule::create(const Queue& queue)
+{
+ store->create(queue);
+}
+
+void MessageStoreModule::destroy(const Queue& queue)
+{
+ store->destroy(queue);
+}
+
+void MessageStoreModule::recover(QueueRegistry& registry)
+{
+ store->recover(registry);
+}
+
+void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+{
+ store->enqueue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+{
+ store->dequeue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::committed(const string * const xid)
+{
+ store->committed(xid);
+}
+
+void MessageStoreModule::aborted(const string * const xid)
+{
+ store->aborted(xid);
+}
+
+std::auto_ptr<TransactionContext> MessageStoreModule::begin()
+{
+ return store->begin();
+}
+
+void MessageStoreModule::commit(TransactionContext* ctxt)
+{
+ store->commit(ctxt);
+}
+
+void MessageStoreModule::abort(TransactionContext* ctxt)
+{
+ store->abort(ctxt);
+}
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
new file mode 100644
index 0000000000..490cfbb5c7
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStoreModule_
+#define _MessageStoreModule_
+
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageStore.h>
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueRegistry.h>
+#include <qpid/sys/Module.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class MessageStoreModule : public MessageStore{
+ qpid::sys::Module<MessageStore> store;
+ public:
+ MessageStoreModule(const std::string& name);
+ void create(const Queue& queue);
+ void destroy(const Queue& queue);
+ void recover(QueueRegistry& queues);
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void committed(const string * const xid);
+ void aborted(const string * const xid);
+ std::auto_ptr<TransactionContext> begin();
+ void commit(TransactionContext* ctxt);
+ void abort(TransactionContext* ctxt);
+ ~MessageStoreModule(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
index c5f17c006a..5b7bb1ff5e 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
@@ -23,6 +23,7 @@
#include <qpid/broker/DirectExchange.h>
#include <qpid/broker/FanOutExchange.h>
#include <qpid/broker/HeadersExchange.h>
+#include <qpid/broker/MessageStoreModule.h>
#include <qpid/broker/NullMessageStore.h>
#include <qpid/broker/SessionHandlerImpl.h>
@@ -38,8 +39,9 @@ const std::string amq_fanout("amq.fanout");
const std::string amq_match("amq.match");
}
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) :
- store(new NullMessageStore()), queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) :
+ store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)),
+ queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
{
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
index 9575656018..3703efcf89 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
@@ -44,7 +44,7 @@ namespace qpid {
const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
AutoDelete cleaner;
public:
- SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+ SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000);
void recover();
virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
virtual ~SessionHandlerFactoryImpl();
diff --git a/cpp/src/qpid/sys/Module.h b/cpp/src/qpid/sys/Module.h
new file mode 100644
index 0000000000..3093eef074
--- /dev/null
+++ b/cpp/src/qpid/sys/Module.h
@@ -0,0 +1,161 @@
+#ifndef _sys_Module_h
+#define _sys_Module_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <iostream>
+#include <qpid/QpidError.h>
+
+namespace qpid {
+namespace sys {
+#if USE_APR
+#include <apr-1/apr_dso.h>
+ typedef apr_dso_handle_t* dso_handle_t;
+#else
+ typedef void* dso_handle_t;
+#endif
+
+ template <class T> class Module : private boost::noncopyable
+ {
+ typedef T* create_t();
+ typedef void destroy_t(T*);
+
+ dso_handle_t handle;
+ destroy_t* destroy;
+ T* ptr;
+
+ void load(const std::string& name);
+ void unload();
+ void* getSymbol(const std::string& name);
+
+ public:
+ Module(const std::string& name);
+ T* operator->();
+ T* get();
+ ~Module() throw();
+ };
+
+}
+}
+
+using namespace qpid::sys;
+
+template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0)
+{
+ load(module);
+ //TODO: need a better strategy for symbol names to allow multiple
+ //modules to be loaded without clashes...
+
+ //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic
+ create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create")));
+ destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy")));
+ ptr = create();
+}
+
+template <class T> T* Module<T>::operator->()
+{
+ return ptr;
+}
+
+template <class T> T* Module<T>::get()
+{
+ return ptr;
+}
+
+template <class T> Module<T>::~Module() throw()
+{
+ try {
+ if (handle && ptr) {
+ destroy(ptr);
+ }
+ if (handle) unload();
+ } catch (std::exception& e) {
+ std::cout << "Error while destroying module: " << e.what() << std::endl;
+ }
+ destroy = 0;
+ handle = 0;
+ ptr = 0;
+}
+
+// APR ================================================================
+#if USE_APR
+
+#include <qpid/apr/APRBase.h>
+#include <qpid/apr/APRPool.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+ CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get()));
+}
+
+template <class T> void Module<T>::unload()
+{
+ CHECK_APR_SUCCESS(apr_dso_unload(handle));
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+ apr_dso_handle_sym_t symbol;
+ CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str()));
+ return (void*) symbol;
+}
+
+// POSIX================================================================
+#else
+
+#include <dlfcn.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+ dlerror();
+ handle = dlopen(name.c_str(), RTLD_NOW);
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+}
+
+template <class T> void Module<T>::unload()
+{
+ dlerror();
+ dlclose(handle);
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+ dlerror();
+ void* sym = dlsym(handle, name.c_str());
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+ return sym;
+}
+
+#endif //if USE_APR
+
+#endif //ifndef _sys_Module_h
+