diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-18 14:45:33 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-18 14:45:33 +0000 |
commit | 9f6c3db207f27e91cb7e76f82f8257a9c5007719 (patch) | |
tree | eea611c55725503413a5427b1e4d6d762e8d369f /cpp | |
parent | 174c235915e94fe9b27493f85b91b6ad6eab9271 (diff) | |
download | qpid-python-9f6c3db207f27e91cb7e76f82f8257a9c5007719.tar.gz |
Added Dispatcher class (plus test). This converts incoming MessageTransfer framesets to Messages and pumps them to registered listeners.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@576935 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ClientMessage.h | 31 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 150 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.h | 87 | ||||
-rw-r--r-- | cpp/src/qpid/framing/TransferContent.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 63 |
6 files changed, 324 insertions, 11 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 49535babff..a4e98c5b68 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -214,6 +214,7 @@ libqpidclient_la_SOURCES = \ qpid/client/ConnectionImpl.cpp \ qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ + qpid/client/Dispatcher.cpp \ qpid/client/MessageListener.cpp \ qpid/client/Correlator.cpp \ qpid/client/CompletionTracker.cpp \ @@ -301,6 +302,7 @@ nobase_include_HEADERS = \ qpid/client/Connector.h \ qpid/client/Completion.h \ qpid/client/Demux.h \ + qpid/client/Dispatcher.h \ qpid/client/MessageListener.h \ qpid/client/MessageQueue.h \ qpid/client/BlockingQueue.h \ diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h index 5c4eb4e5aa..a573e17940 100644 --- a/cpp/src/qpid/client/ClientMessage.h +++ b/cpp/src/qpid/client/ClientMessage.h @@ -22,6 +22,8 @@ * */ #include <string> +#include "qpid/client/Session.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/TransferContent.h" namespace qpid { @@ -40,12 +42,7 @@ public: std::string getDestination() const { - return destination; - } - - void setDestination(const std::string& dest) - { - destination = dest; + return method.getDestination(); } bool isRedelivered() const @@ -53,7 +50,8 @@ public: return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); } - void setRedelivered(bool redelivered) { + void setRedelivered(bool redelivered) + { getDeliveryProperties().setRedelivered(redelivered); } @@ -62,8 +60,25 @@ public: return getMessageProperties().getApplicationHeaders(); } + void acknowledge(Session& session, bool cumulative = true, bool send = true) const + { + session.execution().completed(id, cumulative, send); + } + + Message(const framing::FrameSet& frameset) : method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) + { + populate(frameset); + } + + const framing::MessageTransferBody& getMethod() const + { + return method; + } + private: - std::string destination; + //method and id are only set for received messages: + const framing::MessageTransferBody method; + const framing::SequenceNumber id; }; }} diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp new file mode 100644 index 0000000000..8f3ed8bcbe --- /dev/null +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Dispatcher.h" + +#include "qpid/client/Session.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include "BlockingQueue.h" +#include "ClientMessage.h" + +using qpid::framing::FrameSet; +using qpid::framing::MessageTransferBody; +using qpid::sys::Mutex; +using qpid::sys::ScopedLock; +using qpid::sys::Thread; + +namespace qpid { +namespace client { + + Subscriber::Subscriber(Session& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackFrequency(f), count(0) {} + +void Subscriber::received(Message& msg) +{ + if (listener) { + listener->received(msg); + if (autoAck) { + bool send = (++count >= ackFrequency); + msg.acknowledge(session, true, send); + if (send) count = 0; + } + } +} + + + Dispatcher::Dispatcher(Session& s, const std::string& q) : session(s), queue(q), running(false), stopped(false) +{ +} + +void Dispatcher::start() +{ + worker = Thread(this); +} + +void Dispatcher::run() +{ + BlockingQueue<FrameSet::shared_ptr>& q = queue.empty() ? + session.execution().getDemux().getDefault() : + session.execution().getDemux().get(queue); + + startRunning(); + stopped = false; + while (!isStopped()) { + FrameSet::shared_ptr content = q.pop(); + if (content->isA<MessageTransferBody>()) { + Message msg(*content); + Subscriber::shared_ptr listener = find(msg.getDestination()); + if (!listener) { + QPID_LOG(error, "No message listener set: " << content->getMethod()); + } else { + listener->received(msg); + } + } else { + if (handler.get()) { + handler->handle(*content); + } else { + QPID_LOG(error, "Unhandled method: " << content->getMethod()); + } + } + } + stopRunning(); +} + +void Dispatcher::stop() +{ + ScopedLock<Mutex> l(lock); + stopped = true; +} + +bool Dispatcher::isStopped() +{ + ScopedLock<Mutex> l(lock); + return stopped; +} + +/** + * Prevent concurrent threads invoking run. + */ +void Dispatcher::startRunning() +{ + ScopedLock<Mutex> l(lock); + if (running) { + throw Exception("Dispatcher is already running."); + } + running = true; +} + +void Dispatcher::stopRunning() +{ + ScopedLock<Mutex> l(lock); + running = false; +} + +Subscriber::shared_ptr Dispatcher::find(const std::string& name) +{ + ScopedLock<Mutex> l(lock); + Listeners::iterator i = listeners.find(name); + if (i == listeners.end()) { + return defaultListener; + } + return i->second; +} + +void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackFrequency) +{ + ScopedLock<Mutex> l(lock); + defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackFrequency)); +} + +void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackFrequency) +{ + ScopedLock<Mutex> l(lock); + listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackFrequency)); +} + +void Dispatcher::cancel(const std::string& destination) +{ + ScopedLock<Mutex> l(lock); + listeners.erase(destination); +} + +}} diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h new file mode 100644 index 0000000000..e4a4cec4a6 --- /dev/null +++ b/cpp/src/qpid/client/Dispatcher.h @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Dispatcher_ +#define _Dispatcher_ + +#include <map> +#include <memory> +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "MessageListener.h" + +namespace qpid { +namespace client { + +class Session; + +class Subscriber : public MessageListener +{ + Session& session; + MessageListener* const listener; + const bool autoAck; + const uint ackFrequency; + uint count; + +public: + typedef boost::shared_ptr<Subscriber> shared_ptr; + Subscriber(Session& session, MessageListener* listener, bool autoAck = true, uint ackFrequency = 1); + void received(Message& msg); + +}; + +typedef framing::Handler<framing::FrameSet> FrameSetHandler; + +class Dispatcher : public sys::Runnable +{ + typedef std::map<std::string, Subscriber::shared_ptr> Listeners; + sys::Mutex lock; + sys::Thread worker; + Session& session; + const std::string queue; + bool running; + bool stopped; + Listeners listeners; + Subscriber::shared_ptr defaultListener; + std::auto_ptr<FrameSetHandler> handler; + + Subscriber::shared_ptr find(const std::string& name); + void startRunning(); + void stopRunning(); + bool isStopped(); + +public: + Dispatcher(Session& session, const std::string& queue = ""); + + void start(); + void run(); + void stop(); + + void listen(MessageListener* listener, bool autoAck = true, uint ackFrequency = 1); + void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackFrequency = 1); + void cancel(const std::string& destination); +}; + +}} + +#endif diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h index c5fc395c94..6fd96f3587 100644 --- a/cpp/src/qpid/framing/TransferContent.h +++ b/cpp/src/qpid/framing/TransferContent.h @@ -37,7 +37,7 @@ class TransferContent : public MethodContent AMQHeaderBody header; std::string data; public: - TransferContent(const std::string& data); + TransferContent(const std::string& data = ""); AMQHeaderBody getHeader() const; void setData(const std::string&); void appendData(const std::string&); diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 7c58708974..12b50485e4 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -18,27 +18,55 @@ * under the License. * */ -#include <vector> +#include <list> #include "qpid_test_plugin.h" #include "InProcessBroker.h" +#include "qpid/client/Dispatcher.h" #include "qpid/client/Session.h" #include "qpid/framing/TransferContent.h" using namespace qpid::client; using namespace qpid::framing; +struct DummyListener : public MessageListener +{ + std::list<Message> messages; + std::string name; + uint expected; + uint count; + Dispatcher dispatcher; + + DummyListener(Session& session, const std::string& _name, uint _expected) : name(_name), expected(_expected), count(0), + dispatcher(session) {} + + void listen() + { + dispatcher.listen(name, this, true, 1); + dispatcher.run(); + } + + void received(Message& msg) + { + messages.push_back(msg); + if (++count == expected) { + dispatcher.stop(); + } + } +}; + class ClientSessionTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ClientSessionTest); CPPUNIT_TEST(testQueueQuery); CPPUNIT_TEST(testTransfer); + CPPUNIT_TEST(testDispatcher); CPPUNIT_TEST_SUITE_END(); boost::shared_ptr<Connector> broker; Connection connection; Session session; - public: +public: ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker) { @@ -78,6 +106,37 @@ class ClientSessionTest : public CppUnit::TestCase //confirm receipt: session.execution().completed(msg->getId(), true, true); } + + void testDispatcher() + { + session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true); + + TransferContent msg1("One"); + msg1.getDeliveryProperties().setRoutingKey("my-queue"); + session.messageTransfer_(content=msg1); + + TransferContent msg2("Two"); + msg2.getDeliveryProperties().setRoutingKey("my-queue"); + session.messageTransfer_(content=msg2); + + TransferContent msg3("Three"); + msg3.getDeliveryProperties().setRoutingKey("my-queue"); + session.messageTransfer_(content=msg3); + + session.messageSubscribe_(queue="my-queue", destination="my-dest", acquireMode=1); + session.messageFlow((destination="my-dest", unit=0, value=1));//messages + session.messageFlow((destination="my-dest", unit=1, value=0xFFFFFFFF));//bytes + DummyListener listener(session, "my-dest", 3); + listener.listen(); + CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(std::string("One"), listener.messages.front().getData()); + listener.messages.pop_front(); + CPPUNIT_ASSERT_EQUAL(std::string("Two"), listener.messages.front().getData()); + listener.messages.pop_front(); + CPPUNIT_ASSERT_EQUAL(std::string("Three"), listener.messages.front().getData()); + listener.messages.pop_front(); + + } }; // Make this test suite a plugin. |