diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/client/AckPolicy.h | 54 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 80 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/client/Message.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 61 | ||||
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 7 |
11 files changed, 240 insertions, 123 deletions
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h new file mode 100644 index 0000000000..2a52a8c746 --- /dev/null +++ b/cpp/src/qpid/client/AckPolicy.h @@ -0,0 +1,54 @@ +#ifndef QPID_CLIENT_ACKPOLICY_H +#define QPID_CLIENT_ACKPOLICY_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace client { + +/** + * Policy for automatic acknowledgement of messages. + */ +class AckPolicy +{ + size_t interval; + size_t count; + + public: + /** + *@param n: acknowledge every n messages. + *n==0 means no automatick acknowledgement. + */ + AckPolicy(size_t n=1) : interval(n), count(n) {} + + void ack(const Message& msg) { + if (!interval) return; + bool send=(--count==0); + msg.acknowledge(true, send); + if (send) count = interval; + } +}; + +}} // namespace qpid::client + + + +#endif /*!QPID_CLIENT_ACKPOLICY_H*/ diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index ba11ea5569..80d97b10aa 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -52,9 +52,8 @@ Connector::Connector( } Connector::~Connector(){ - if (receiver.id()) { + if (receiver.id() && receiver.id() != Thread::current().id()) receiver.join(); - } } void Connector::connect(const std::string& host, int port){ diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 65756f6404..fd6a18b349 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -27,6 +27,8 @@ #include "qpid/sys/BlockingQueue.h" #include "Message.h" +#include <boost/state_saver.hpp> + using qpid::framing::FrameSet; using qpid::framing::MessageTransferBody; using qpid::sys::Mutex; @@ -36,23 +38,22 @@ using qpid::sys::Thread; namespace qpid { namespace client { - Subscriber::Subscriber(Session_0_10& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackBatchSize(f), count(0) {} +Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {} void Subscriber::received(Message& msg) { if (listener) { listener->received(msg); - if (autoAck) { - bool send = (++count >= ackBatchSize); - msg.acknowledge(session, true, send); - if (send) count = 0; - } + autoAck.ack(msg); } } - - Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), queue(q), running(false), stopped(false) +Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) + : session(s), running(false) { + queue = q.empty() ? + session.execution().getDemux().getDefault() : + session.execution().getDemux().get(q); } void Dispatcher::start() @@ -62,19 +63,22 @@ void Dispatcher::start() void Dispatcher::run() { - Demux::QueuePtr q = queue.empty() ? - session.execution().getDemux().getDefault() : - session.execution().getDemux().get(queue); - - startRunning(); - stopped = false; - while (!isStopped()) { - FrameSet::shared_ptr content = q->pop(); + Mutex::ScopedLock l(lock); + if (running) + throw Exception("Dispatcher is already running."); + boost::state_saver<bool> reset(running); // Reset to false on exit. + running = true; + queue->open(); + while (!queue->isClosed()) { + Mutex::ScopedUnlock u(lock); + FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message msg(*content); + Message msg(*content, session); Subscriber::shared_ptr listener = find(msg.getDestination()); if (!listener) { - QPID_LOG(error, "No message listener set: " << content->getMethod()); + // FIXME aconway 2007-11-07: Should close session & throw here? + QPID_LOG(error, "No message listener for " + << content->getMethod()); } else { listener->received(msg); } @@ -82,41 +86,23 @@ void Dispatcher::run() if (handler.get()) { handler->handle(*content); } else { + // FIXME aconway 2007-11-07: Should close session & throw here? QPID_LOG(error, "Unhandled method: " << content->getMethod()); } } } - stopRunning(); } void Dispatcher::stop() { ScopedLock<Mutex> l(lock); - stopped = true; -} - -bool Dispatcher::isStopped() -{ - ScopedLock<Mutex> l(lock); - return stopped; -} - -/** - * Prevent concurrent threads invoking run. - */ -void Dispatcher::startRunning() -{ - ScopedLock<Mutex> l(lock); - if (running) { - throw Exception("Dispatcher is already running."); - } - running = true; + queue->close(); // Will interrupt thread blocked in pop() } -void Dispatcher::stopRunning() +void Dispatcher::setAutoStop(bool b) { ScopedLock<Mutex> l(lock); - running = false; + autoStop = b; } Subscriber::shared_ptr Dispatcher::find(const std::string& name) @@ -129,22 +115,28 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name) return i->second; } -void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize) +void Dispatcher::listen( + MessageListener* listener, AckPolicy autoAck +) { ScopedLock<Mutex> l(lock); - defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize)); + defaultListener = Subscriber::shared_ptr( + new Subscriber(session, listener, autoAck)); } -void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackBatchSize) +void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck) { ScopedLock<Mutex> l(lock); - listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize)); + listeners[destination] = Subscriber::shared_ptr( + new Subscriber(session, listener, autoAck)); } void Dispatcher::cancel(const std::string& destination) { ScopedLock<Mutex> l(lock); listeners.erase(destination); + if (autoStop && listeners.empty()) + queue->close(); } }} diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h index 550ba36ef5..ae67e61299 100644 --- a/cpp/src/qpid/client/Dispatcher.h +++ b/cpp/src/qpid/client/Dispatcher.h @@ -29,6 +29,7 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "MessageListener.h" +#include "AckPolicy.h" namespace qpid { namespace client { @@ -39,13 +40,11 @@ class Subscriber : public MessageListener { Session_0_10& session; MessageListener* const listener; - const bool autoAck; - const uint ackBatchSize; - uint count; + AckPolicy autoAck; public: typedef boost::shared_ptr<Subscriber> shared_ptr; - Subscriber(Session_0_10& session, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1); + Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy); void received(Message& msg); }; @@ -58,16 +57,14 @@ class Dispatcher : public sys::Runnable sys::Mutex lock; sys::Thread worker; Session_0_10& session; - const std::string queue; + Demux::QueuePtr queue; bool running; - bool stopped; + bool autoStop; Listeners listeners; Subscriber::shared_ptr defaultListener; std::auto_ptr<FrameSetHandler> handler; Subscriber::shared_ptr find(const std::string& name); - void startRunning(); - void stopRunning(); bool isStopped(); public: @@ -76,9 +73,10 @@ public: void start(); void run(); void stop(); + void setAutoStop(bool b); - void listen(MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1); - void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1); + void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy()); + void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy()); void cancel(const std::string& destination); }; diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp new file mode 100644 index 0000000000..09bf1e055a --- /dev/null +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LocalQueue.h" +#include "qpid/Exception.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace client { + +using namespace framing; + +LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {} +LocalQueue::~LocalQueue() {} + +Message LocalQueue::pop() { + if (!queue) + throw ClosedException(); + FrameSet::shared_ptr content = queue->pop(); + if (content->isA<MessageTransferBody>()) + return Message(*content, session); + else + throw CommandInvalidException( + QPID_MSG("Unexpected method: " << content->getMethod())); +} + +void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; } + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index 048b4c7b4d..1c910bd3ee 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -23,8 +23,8 @@ */ #include "qpid/client/Message.h" -#include "qpid/Exception.h" -#include "qpid/sys/BlockingQueue.h" +#include "qpid/client/Demux.h" +#include "qpid/client/AckPolicy.h" namespace qpid { namespace client { @@ -35,16 +35,21 @@ namespace client { class LocalQueue { public: - LocalQueue(BlockingQueue& q) : queue(q) {} + LocalQueue(AckPolicy=AckPolicy()); ~LocalQueue(); /** Pop the next message off the queue. *@exception ClosedException if subscription has been closed. */ - Message pop() { reurn queue->pop(); } + Message pop(); + + void setAckPolicy(AckPolicy); private: - BlockingQueue& queue; + friend class SubscriptionManager; + Session_0_10 session; + Demux::QueuePtr queue; + AckPolicy autoAck; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index b2ae273813..081384974a 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -68,7 +68,14 @@ public: session.execution().completed(id, cumulative, send); } - Message(const framing::FrameSet& frameset) : method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) + void acknowledge(bool cumulative = true, bool send = true) const + { + const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send); + } + + /**@internal for incoming messages */ + Message(const framing::FrameSet& frameset, Session_0_10 s) : + method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) { populate(frameset); } @@ -83,10 +90,13 @@ public: return id; } + /**@internal use for incoming messages. */ + void setSession(Session_0_10 s) { session=s; } private: //method and id are only set for received messages: - const framing::MessageTransferBody method; - const framing::SequenceNumber id; + framing::MessageTransferBody method; + framing::SequenceNumber id; + Session_0_10 session; }; }} diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 02ea57fecc..14b9020282 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -116,7 +116,6 @@ void SessionCore::attaching(shared_ptr<ConnectionImpl> c) { SessionCore::~SessionCore() { Lock l(state); - invariant(); detach(COMMAND_INVALID, "Session deleted"); state.waitWaiters(); } diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index bf5191e8a0..fc65843643 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -33,57 +33,54 @@ namespace qpid { namespace client { SubscriptionManager::SubscriptionManager(Session_0_10& s) - : dispatcher(s), session(s), messages(1), bytes(UNLIMITED), autoStop(true) + : dispatcher(s), session(s), + messages(UNLIMITED), bytes(UNLIMITED), window(true) {} -std::string SubscriptionManager::uniqueTag(const std::string& tag) { - // Make unique tag. - int count=1; - std::string unique=tag; - while (subscriptions.find(tag) != subscriptions.end()) { - std::ostringstream s; - s << tag << "-" << count++; - unique=s.str(); - } - subscriptions.insert(unique); - return tag; -} - -std::string SubscriptionManager::subscribe( +void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& t) { - std::string tag=uniqueTag(t); - using namespace arg; + std::string tag=t.empty() ? q:t; session.messageSubscribe(arg::queue=q, arg::destination=tag); - flowLimits(tag, messages, bytes); dispatcher.listen(tag, &listener); - return tag; + setFlowControl(tag, messages, bytes, window); +} + +void SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const std::string& t) +{ + std::string tag=t.empty() ? q:t; + lq.session=session; + lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag)); + session.messageSubscribe(arg::queue=q, arg::destination=tag); + setFlowControl(tag, messages, bytes, window); } -void SubscriptionManager::flowLimits( - const std::string& tag, uint32_t messages, uint32_t bytes) { +void SubscriptionManager::setFlowControl( + const std::string& tag, uint32_t messages, uint32_t bytes, bool window) +{ + session.messageFlowMode(tag, window); session.messageFlow(tag, 0, messages); session.messageFlow(tag, 1, bytes); } -void SubscriptionManager::flowLimits(uint32_t m, uint32_t b) { - messages=m; - bytes=b; +void SubscriptionManager::setFlowControl( + uint32_t messages_, uint32_t bytes_, bool window_) +{ + messages=messages_; + bytes=bytes_; + window=window_; } void SubscriptionManager::cancel(const std::string tag) { - if (subscriptions.erase(tag)) { - dispatcher.cancel(tag); - session.messageCancel(tag); - if (autoStop && subscriptions.empty()) stop(); - } + dispatcher.cancel(tag); + session.messageCancel(tag); } -void SubscriptionManager::run(bool autoStop_) +void SubscriptionManager::run(bool autoStop) { - autoStop=autoStop_; - if (autoStop && subscriptions.empty()) return; + dispatcher.setAutoStop(autoStop); dispatcher.run(); } diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 985b6ce222..1a03c1a47b 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -21,55 +21,59 @@ * under the License. * */ - +#include "qpid/sys/Mutex.h" #include <qpid/client/Dispatcher.h> #include <qpid/client/Session_0_10.h> #include <qpid/client/MessageListener.h> +#include <qpid/client/LocalQueue.h> #include <set> #include <sstream> namespace qpid { namespace client { -struct TagNotUniqueException : public qpid::Exception { - TagNotUniqueException() {} -}; - class SubscriptionManager { - std::set<std::string> subscriptions; + typedef sys::Mutex::ScopedLock Lock; + typedef sys::Mutex::ScopedUnlock Unlock; + qpid::client::Dispatcher dispatcher; qpid::client::Session_0_10& session; - std::string uniqueTag(const std::string&); uint32_t messages; uint32_t bytes; - bool autoStop; + bool window; public: SubscriptionManager(Session_0_10& session); /** - * Subscribe listener to receive messages from queue. + * Subscribe a MessagesListener to receive messages from queue. + * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. *@param tag Unique destination tag for the listener. - * If not specified a unique tag will be generted based on the queue name. - *@return Destination tag. - *@exception TagNotUniqueException if there is already a subscription - * with the same tag. + * If not specified, the queue name is used. */ - std::string subscribe(MessageListener& listener, - const std::string& queue, - const std::string& tag=std::string()); + void subscribe(MessageListener& listener, + const std::string& queue, + const std::string& tag=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + *@param queue Name of the queue to subscribe to. + *@param tag Unique destination tag for the listener. + * If not specified, the queue name is used. + */ + void subscribe(LocalQueue& localQueue, + const std::string& queue, + const std::string& tag=std::string()); /** Cancel a subscription. */ void cancel(const std::string tag); - - qpid::client::Dispatcher& getDispatcher() { return dispatcher; } - size_t size() { return subscriptions.size(); } /** Deliver messages until stop() is called. - *@param autoStop If true, return when all subscriptions are cancelled. + *@param autoStop If true, return when all listeners are cancelled. */ void run(bool autoStop=true); @@ -78,13 +82,20 @@ public: static const uint32_t UNLIMITED=0xFFFFFFFF; - /** Set the flow control limits for subscriber with tag. - * UNLIMITED means no limit. + /** Set the flow control for destination tag. + *@param tag: name of the destination. + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. */ - void flowLimits(const std::string& tag, uint32_t messages, uint32_t bytes); + void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true); - /** Set the initial flow control limits for new subscribers */ - void flowLimits(uint32_t messages, uint32_t bytes); + /** Set the initial flow control settings to be applied to each new subscribtion. + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. + */ + void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); }; diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index c53949ad6f..44c95b225d 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -32,7 +32,7 @@ namespace sys { template <class T> class BlockingQueue { - sys::Waitable lock; + mutable sys::Waitable lock; std::queue<T> queue; bool closed; @@ -95,6 +95,11 @@ public: closed=false; } + bool isClosed() const { + Waitable::ScopedLock l(lock); + return closed; + } + private: void queueNotify(size_t ignore) { |