diff options
author | Alan Conway <aconway@apache.org> | 2007-11-07 16:06:31 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-11-07 16:06:31 +0000 |
commit | d19657d82321b2b5e2cac386c49aa99f82b976fb (patch) | |
tree | eceb3790181d8b058e207a0847c35024856325b0 | |
parent | 5bec8a4a978290a8dd710870a04bd09e9f493c26 (diff) | |
download | qpid-python-d19657d82321b2b5e2cac386c49aa99f82b976fb.tar.gz |
Added LocalQueue subscriptions. LocalQueue::pop() provides a "pull"
alternative to the MessageListener::received() "push" API.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592803 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/Exception.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/Exception.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/Demux.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/client/Demux.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 52 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 6 | ||||
-rw-r--r-- | cpp/src/tests/InProcessBroker.h | 2 |
10 files changed, 90 insertions, 31 deletions
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index b25a6dc8f8..07f157cfc3 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -46,4 +46,6 @@ std::string Exception::str() const throw() { const char* Exception::what() const throw() { return str().c_str(); } +const std::string ClosedException::CLOSED_MESSAGE("Closed"); + } // namespace qpid diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index 4ca15b37fc..57c7a21234 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -61,6 +61,11 @@ struct ConnectionException : public Exception { : Exception(message), code(code_) {} }; -} // namespace qpid +struct ClosedException : public Exception { + static const std::string CLOSED_MESSAGE; + ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {} +}; +} // namespace qpid + #endif /*!_Exception_*/ diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index 9246ead44e..2d43d300dd 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -179,7 +179,7 @@ void Channel::cancel(const std::string& tag, bool synch) { bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { string tag = "get-handler"; ScopedDivert handler(tag, session.execution().getDemux()); - Demux::Queue& incoming = handler.getQueue(); + Demux::QueuePtr incoming = handler.getQueue(); session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); @@ -189,7 +189,7 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { session.messageCancel(tag); FrameSet::shared_ptr p; - if (incoming.tryPop(p)) { + if (incoming->tryPop(p)) { msg.populate(*p); if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); return true; @@ -265,7 +265,7 @@ void Channel::run() { QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); } } - } catch (const sys::QueueClosed&) {} + } catch (const ClosedException&) {} } }} diff --git a/cpp/src/qpid/client/Demux.cpp b/cpp/src/qpid/client/Demux.cpp index d85ad92003..bd1dda0ae9 100644 --- a/cpp/src/qpid/client/Demux.cpp +++ b/cpp/src/qpid/client/Demux.cpp @@ -37,7 +37,7 @@ bool ByTransferDest::operator()(const framing::FrameSet& frameset) const ScopedDivert::ScopedDivert(const std::string& _dest, Demux& _demuxer) : dest(_dest), demuxer(_demuxer) { - queue = &(demuxer.add(dest, ByTransferDest(dest))); + queue = demuxer.add(dest, ByTransferDest(dest)); } ScopedDivert::~ScopedDivert() @@ -45,9 +45,9 @@ ScopedDivert::~ScopedDivert() demuxer.remove(dest); } -Demux::Queue& ScopedDivert::getQueue() +Demux::QueuePtr ScopedDivert::getQueue() { - return *queue; + return queue; } void Demux::handle(framing::FrameSet::shared_ptr frameset) @@ -61,7 +61,7 @@ void Demux::handle(framing::FrameSet::shared_ptr frameset) } } if (!matched) { - defaultQueue.push(frameset); + defaultQueue->push(frameset); } } @@ -71,17 +71,17 @@ void Demux::close() for (iterator i = records.begin(); i != records.end(); i++) { i->queue->close(); } - defaultQueue.close(); + defaultQueue->close(); } -Demux::Queue& Demux::add(const std::string& name, Condition condition) +Demux::QueuePtr Demux::add(const std::string& name, Condition condition) { sys::Mutex::ScopedLock l(lock); iterator i = std::find_if(records.begin(), records.end(), Find(name)); if (i == records.end()) { Record r(name, condition); records.push_back(r); - return *(r.queue); + return r.queue; } else { throw Exception("Queue already exists for " + name); } @@ -93,17 +93,17 @@ void Demux::remove(const std::string& name) records.remove_if(Find(name)); } -Demux::Queue& Demux::get(const std::string& name) +Demux::QueuePtr Demux::get(const std::string& name) { sys::Mutex::ScopedLock l(lock); iterator i = std::find_if(records.begin(), records.end(), Find(name)); if (i == records.end()) { throw Exception("No queue for " + name); } - return *(i->queue); + return i->queue; } -Demux::Queue& Demux::getDefault() +Demux::QueuePtr Demux::getDefault() { return defaultQueue; } diff --git a/cpp/src/qpid/client/Demux.h b/cpp/src/qpid/client/Demux.h index 3c9d4a4857..5aaf75db44 100644 --- a/cpp/src/qpid/client/Demux.h +++ b/cpp/src/qpid/client/Demux.h @@ -45,16 +45,19 @@ class Demux public: typedef boost::function<bool(const framing::FrameSet&)> Condition; typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue; + typedef boost::shared_ptr<Queue> QueuePtr; + Demux() : defaultQueue(new Queue()) {} + void handle(framing::FrameSet::shared_ptr); void close(); - Queue& add(const std::string& name, Condition); + QueuePtr add(const std::string& name, Condition); void remove(const std::string& name); - Queue& get(const std::string& name); - Queue& getDefault(); + QueuePtr get(const std::string& name); + QueuePtr getDefault(); + private: - typedef boost::shared_ptr<Queue> QueuePtr; struct Record { const std::string name; @@ -66,7 +69,7 @@ private: sys::Mutex lock; std::list<Record> records; - Queue defaultQueue; + QueuePtr defaultQueue; typedef std::list<Record>::iterator iterator; @@ -82,15 +85,14 @@ class ScopedDivert { const std::string dest; Demux& demuxer; - Demux::Queue* queue; + Demux::QueuePtr queue; public: ScopedDivert(const std::string& dest, Demux& demuxer); ~ScopedDivert(); - Demux::Queue& getQueue(); + Demux::QueuePtr getQueue(); }; -} -} +}} // namespace qpid::client #endif diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 37f3941022..65756f6404 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -62,14 +62,14 @@ void Dispatcher::start() void Dispatcher::run() { - sys::BlockingQueue<FrameSet::shared_ptr>& q = queue.empty() ? + Demux::QueuePtr q = queue.empty() ? session.execution().getDemux().getDefault() : session.execution().getDemux().get(queue); startRunning(); stopped = false; while (!isStopped()) { - FrameSet::shared_ptr content = q.pop(); + FrameSet::shared_ptr content = q->pop(); if (content->isA<MessageTransferBody>()) { Message msg(*content); Subscriber::shared_ptr listener = find(msg.getDestination()); diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h new file mode 100644 index 0000000000..048b4c7b4d --- /dev/null +++ b/cpp/src/qpid/client/LocalQueue.h @@ -0,0 +1,52 @@ +#ifndef QPID_CLIENT_LOCALQUEUE_H +#define QPID_CLIENT_LOCALQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/Message.h" +#include "qpid/Exception.h" +#include "qpid/sys/BlockingQueue.h" + +namespace qpid { +namespace client { + +/** + * Local representation of a remote queue. + */ +class LocalQueue +{ + public: + LocalQueue(BlockingQueue& q) : queue(q) {} + ~LocalQueue(); + + /** Pop the next message off the queue. + *@exception ClosedException if subscription has been closed. + */ + Message pop() { reurn queue->pop(); } + + private: + BlockingQueue& queue; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_LOCALQUEUE_H*/ diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index f7f0f52dba..02ea57fecc 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -162,7 +162,7 @@ bool SessionCore::isSync() { // user thread FrameSet::shared_ptr SessionCore::get() { // user thread // No lock here: pop does a blocking wait. - return l3.getDemux().getDefault().pop(); + return l3.getDemux().getDefault()->pop(); } void SessionCore::open(uint32_t detachedLifetime) { // user thread diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index 65196dbd9c..c53949ad6f 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -29,8 +29,6 @@ namespace qpid { namespace sys { -struct QueueClosed {}; - template <class T> class BlockingQueue { @@ -46,7 +44,7 @@ public: T pop() { Waitable::ScopedLock l(lock); - if (!queueWait()) throw QueueClosed(); + if (!queueWait()) throw ClosedException(); return popInternal(); } @@ -78,7 +76,7 @@ public: } /** - * Close the queue. Throws QueueClosed in threads waiting in pop(). + * Close the queue. Throws ClosedException in threads waiting in pop(). * Blocks till all waiting threads have been notified. */ void close() diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index 3f6ff0936e..c893e6906a 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -83,7 +83,7 @@ class InProcessConnector : QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f)); } } - catch (const sys::QueueClosed&) { + catch (const ClosedException&) { return; } } |