diff options
author | Alan Conway <aconway@apache.org> | 2009-05-01 17:05:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-05-01 17:05:00 +0000 |
commit | 38dc8e231d6136dd6ae0cfa28f4f9dcb90677c77 (patch) | |
tree | 9321363049f8675518c83a2b53947143526dc5be /cpp/src | |
parent | 4ed7e726a3ee1f3a86acbd1e2c89598b60b8c70a (diff) | |
download | qpid-python-38dc8e231d6136dd6ae0cfa28f4f9dcb90677c77.tar.gz |
Apply PIMPL pattern to qpid::client::LocalQueue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@770756 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Completion.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 51 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueueImpl.cpp | 78 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueueImpl.h | 108 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 9 |
7 files changed, 218 insertions, 53 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index c1b68a8f50..89a7c49067 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -451,6 +451,8 @@ libqpidclient_la_SOURCES = \ qpid/client/FutureResult.cpp \ qpid/client/LoadPlugins.cpp \ qpid/client/LocalQueue.cpp \ + qpid/client/LocalQueueImpl.cpp \ + qpid/client/LocalQueueImpl.h \ qpid/client/Message.cpp \ qpid/client/MessageImpl.cpp \ qpid/client/MessageImpl.h \ diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h index d28cfac52a..1e383bb0b5 100644 --- a/cpp/src/qpid/client/Completion.h +++ b/cpp/src/qpid/client/Completion.h @@ -62,7 +62,6 @@ public: private: typedef CompletionImpl Impl; - Impl* impl; friend class PrivateImplRef<Completion>; }; diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index dfd8405a7f..eac4b458af 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -19,6 +19,7 @@ * */ #include "LocalQueue.h" +#include "LocalQueueImpl.h" #include "MessageImpl.h" #include "qpid/Exception.h" #include "qpid/framing/FrameSet.h" @@ -26,56 +27,26 @@ #include "qpid/framing/reply_exceptions.h" #include "PrivateImplRef.h" #include "SubscriptionImpl.h" -#include "CompletionImpl.h" namespace qpid { namespace client { using namespace framing; -LocalQueue::LocalQueue() {} -LocalQueue::~LocalQueue() {} +typedef PrivateImplRef<LocalQueue> PI; -Message LocalQueue::pop(sys::Duration timeout) { return get(timeout); } +LocalQueue::LocalQueue() { PI::ctor(*this, new LocalQueueImpl()); } +LocalQueue::LocalQueue(const LocalQueue& x) : Handle<LocalQueueImpl>() { PI::copy(*this, x); } +LocalQueue::~LocalQueue() { PI::dtor(*this); } +LocalQueue& LocalQueue::operator=(const LocalQueue& x) { return PI::assign(*this, x); } -Message LocalQueue::get(sys::Duration timeout) { - Message result; - bool ok = get(result, timeout); - if (!ok) throw Exception("Timed out waiting for a message"); - return result; -} +Message LocalQueue::pop(sys::Duration timeout) { return impl->pop(timeout); } -bool LocalQueue::get(Message& result, sys::Duration timeout) { - if (!queue) - throw ClosedException(); - FrameSet::shared_ptr content; - bool ok = queue->pop(content, timeout); - if (!ok) return false; - if (content->isA<MessageTransferBody>()) { +Message LocalQueue::get(sys::Duration timeout) { return impl->get(timeout); } - *MessageImpl::get(result) = MessageImpl(*content); - boost::intrusive_ptr<SubscriptionImpl> si = PrivateImplRef<Subscription>::get(subscription); - assert(si); - if (si) si->received(result); - return true; - } - else - throw CommandInvalidException( - QPID_MSG("Unexpected method: " << content->getMethod())); -} +bool LocalQueue::get(Message& result, sys::Duration timeout) { return impl->get(result, timeout); } -bool LocalQueue::empty() const -{ - if (!queue) - throw ClosedException(); - return queue->empty(); -} - -size_t LocalQueue::size() const -{ - if (!queue) - throw ClosedException(); - return queue->size(); -} +bool LocalQueue::empty() const { return impl->empty(); } +size_t LocalQueue::size() const { return impl->size(); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index 5b739d4303..4aa14ce004 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -23,14 +23,16 @@ */ #include "ClientImportExport.h" -#include "qpid/client/Message.h" -#include "qpid/client/Subscription.h" -#include "qpid/client/Demux.h" +#include "Handle.h" +#include "Message.h" #include "qpid/sys/Time.h" namespace qpid { namespace client { +class LocalQueueImpl; +template <class T> class PrivateImplRef; + /** * A local queue to collect messages retrieved from a remote broker * queue. Create a queue and subscribe it using the SubscriptionManager. @@ -69,7 +71,7 @@ namespace client { * </ul> */ -class LocalQueue { +class LocalQueue : public Handle<LocalQueueImpl> { public: /** Create a local queue. Subscribe the local queue to a remote broker * queue with a SubscriptionManager. @@ -77,8 +79,9 @@ class LocalQueue { * LocalQueue is an alternative to implementing a MessageListener. */ QPID_CLIENT_EXTERN LocalQueue(); - + QPID_CLIENT_EXTERN LocalQueue(const LocalQueue&); QPID_CLIENT_EXTERN ~LocalQueue(); + QPID_CLIENT_EXTERN LocalQueue& operator=(const LocalQueue&); /** Wait up to timeout for the next message from the local queue. *@param result Set to the message from the queue. @@ -104,11 +107,12 @@ class LocalQueue { /** Number of messages on the local queue */ QPID_CLIENT_EXTERN size_t size() const; - private: - Demux::QueuePtr queue; - Subscription subscription; + QPID_CLIENT_EXTERN LocalQueue(LocalQueueImpl*); ///<@internal - friend class SubscriptionManager; + + private: + typedef LocalQueueImpl Impl; + friend class PrivateImplRef<LocalQueue>; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/LocalQueueImpl.cpp b/cpp/src/qpid/client/LocalQueueImpl.cpp new file mode 100644 index 0000000000..21ae768848 --- /dev/null +++ b/cpp/src/qpid/client/LocalQueueImpl.cpp @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LocalQueueImpl.h" +#include "MessageImpl.h" +#include "qpid/Exception.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" +#include "PrivateImplRef.h" +#include "SubscriptionImpl.h" +#include "CompletionImpl.h" + +namespace qpid { +namespace client { + +using namespace framing; + +Message LocalQueueImpl::pop(sys::Duration timeout) { return get(timeout); } + +Message LocalQueueImpl::get(sys::Duration timeout) { + Message result; + bool ok = get(result, timeout); + if (!ok) throw Exception("Timed out waiting for a message"); + return result; +} + +bool LocalQueueImpl::get(Message& result, sys::Duration timeout) { + if (!queue) + throw ClosedException(); + FrameSet::shared_ptr content; + bool ok = queue->pop(content, timeout); + if (!ok) return false; + if (content->isA<MessageTransferBody>()) { + + *MessageImpl::get(result) = MessageImpl(*content); + boost::intrusive_ptr<SubscriptionImpl> si = PrivateImplRef<Subscription>::get(subscription); + assert(si); + if (si) si->received(result); + return true; + } + else + throw CommandInvalidException( + QPID_MSG("Unexpected method: " << content->getMethod())); +} + +bool LocalQueueImpl::empty() const +{ + if (!queue) + throw ClosedException(); + return queue->empty(); +} + +size_t LocalQueueImpl::size() const +{ + if (!queue) + throw ClosedException(); + return queue->size(); +} + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/LocalQueueImpl.h b/cpp/src/qpid/client/LocalQueueImpl.h new file mode 100644 index 0000000000..64da8675d8 --- /dev/null +++ b/cpp/src/qpid/client/LocalQueueImpl.h @@ -0,0 +1,108 @@ +#ifndef QPID_CLIENT_LOCALQUEUEIMPL_H +#define QPID_CLIENT_LOCALQUEUEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ClientImportExport.h" +#include "Handle.h" +#include "qpid/client/Message.h" +#include "qpid/client/Subscription.h" +#include "qpid/client/Demux.h" +#include "qpid/sys/Time.h" +#include "qpid/RefCounted.h" + +namespace qpid { +namespace client { + +/** + * A local queue to collect messages retrieved from a remote broker + * queue. Create a queue and subscribe it using the SubscriptionManager. + * Messages from the remote queue on the broker will be stored in the + * local queue until you retrieve them. + * + * \ingroup clientapi + * + * \details Using a Local Queue + * + * <pre> + * LocalQueue local_queue; + * subscriptions.subscribe(local_queue, string("message_queue")); + * for (int i=0; i<10; i++) { + * Message message = local_queue.get(); + * std::cout << message.getData() << std::endl; + * } + * </pre> + * + * <h2>Getting Messages</h2> + * + * <ul><li> + * <p>get()</p> + * <pre>Message message = local_queue.get();</pre> + * <pre>// Specifying timeouts (TIME_SEC, TIME_MSEC, TIME_USEC, TIME_NSEC) + *#include <qpid/sys/Time.h> + *Message message; + *local_queue.get(message, 5*sys::TIME_SEC);</pre></li></ul> + * + * <h2>Checking size</h2> + * <ul><li> + * <p>empty()</p> + * <pre>if (local_queue.empty()) { ... }</pre></li> + * <li><p>size()</p> + * <pre>std::cout << local_queue.size();</pre></li> + * </ul> + */ + +class LocalQueueImpl : public RefCounted { + public: + /** Wait up to timeout for the next message from the local queue. + *@param result Set to the message from the queue. + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if queue was empty after timeout. + */ + bool get(Message& result, sys::Duration timeout=0); + + /** Get the next message off the local queue, or wait up to the timeout + * for message from the broker queue. + *@param timeout wait up this timeout for a message to appear. + *@return message from the queue. + *@throw ClosedException if subscription is closed or timeout exceeded. + */ + Message get(sys::Duration timeout=sys::TIME_INFINITE); + + /** Synonym for get() */ + Message pop(sys::Duration timeout=sys::TIME_INFINITE); + + /** Return true if local queue is empty. */ + bool empty() const; + + /** Number of messages on the local queue */ + size_t size() const; + + private: + Demux::QueuePtr queue; + Subscription subscription; + friend class SubscriptionManager; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_LOCALQUEUEIMPL_H*/ diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index b016109ead..999b9c6ba7 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -23,6 +23,8 @@ #include "SubscriptionManager.h" #include "SubscriptionImpl.h" +#include "LocalQueueImpl.h" +#include "PrivateImplRef.h" #include <qpid/client/Dispatcher.h> #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> @@ -56,10 +58,11 @@ Subscription SubscriptionManager::subscribe( sys::Mutex::ScopedLock l(lock); std::string name=n.empty() ? q:n; boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); - lq.queue=si->divert(); + boost::intrusive_ptr<LocalQueueImpl> lqi = PrivateImplRef<LocalQueue>::get(lq); + lqi->queue=si->divert(); si->subscribe(); - lq.subscription = Subscription(si.get()); - return subscriptions[name] = lq.subscription; + lqi->subscription = Subscription(si.get()); + return subscriptions[name] = lqi->subscription; } Subscription SubscriptionManager::subscribe( |