summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-07 16:06:31 +0000
committerAlan Conway <aconway@apache.org>2007-11-07 16:06:31 +0000
commitd19657d82321b2b5e2cac386c49aa99f82b976fb (patch)
treeeceb3790181d8b058e207a0847c35024856325b0
parent5bec8a4a978290a8dd710870a04bd09e9f493c26 (diff)
downloadqpid-python-d19657d82321b2b5e2cac386c49aa99f82b976fb.tar.gz
Added LocalQueue subscriptions. LocalQueue::pop() provides a "pull"
alternative to the MessageListener::received() "push" API. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592803 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/Exception.cpp2
-rw-r--r--cpp/src/qpid/Exception.h7
-rw-r--r--cpp/src/qpid/client/Channel.cpp6
-rw-r--r--cpp/src/qpid/client/Demux.cpp20
-rw-r--r--cpp/src/qpid/client/Demux.h20
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp4
-rw-r--r--cpp/src/qpid/client/LocalQueue.h52
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp2
-rw-r--r--cpp/src/qpid/sys/BlockingQueue.h6
-rw-r--r--cpp/src/tests/InProcessBroker.h2
10 files changed, 90 insertions, 31 deletions
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp
index b25a6dc8f8..07f157cfc3 100644
--- a/cpp/src/qpid/Exception.cpp
+++ b/cpp/src/qpid/Exception.cpp
@@ -46,4 +46,6 @@ std::string Exception::str() const throw() {
const char* Exception::what() const throw() { return str().c_str(); }
+const std::string ClosedException::CLOSED_MESSAGE("Closed");
+
} // namespace qpid
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index 4ca15b37fc..57c7a21234 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -61,6 +61,11 @@ struct ConnectionException : public Exception {
: Exception(message), code(code_) {}
};
-} // namespace qpid
+struct ClosedException : public Exception {
+ static const std::string CLOSED_MESSAGE;
+ ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {}
+};
+} // namespace qpid
+
#endif /*!_Exception_*/
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index 9246ead44e..2d43d300dd 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -179,7 +179,7 @@ void Channel::cancel(const std::string& tag, bool synch) {
bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
string tag = "get-handler";
ScopedDivert handler(tag, session.execution().getDemux());
- Demux::Queue& incoming = handler.getQueue();
+ Demux::QueuePtr incoming = handler.getQueue();
session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
@@ -189,7 +189,7 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
session.messageCancel(tag);
FrameSet::shared_ptr p;
- if (incoming.tryPop(p)) {
+ if (incoming->tryPop(p)) {
msg.populate(*p);
if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true);
return true;
@@ -265,7 +265,7 @@ void Channel::run() {
QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
}
}
- } catch (const sys::QueueClosed&) {}
+ } catch (const ClosedException&) {}
}
}}
diff --git a/cpp/src/qpid/client/Demux.cpp b/cpp/src/qpid/client/Demux.cpp
index d85ad92003..bd1dda0ae9 100644
--- a/cpp/src/qpid/client/Demux.cpp
+++ b/cpp/src/qpid/client/Demux.cpp
@@ -37,7 +37,7 @@ bool ByTransferDest::operator()(const framing::FrameSet& frameset) const
ScopedDivert::ScopedDivert(const std::string& _dest, Demux& _demuxer) : dest(_dest), demuxer(_demuxer)
{
- queue = &(demuxer.add(dest, ByTransferDest(dest)));
+ queue = demuxer.add(dest, ByTransferDest(dest));
}
ScopedDivert::~ScopedDivert()
@@ -45,9 +45,9 @@ ScopedDivert::~ScopedDivert()
demuxer.remove(dest);
}
-Demux::Queue& ScopedDivert::getQueue()
+Demux::QueuePtr ScopedDivert::getQueue()
{
- return *queue;
+ return queue;
}
void Demux::handle(framing::FrameSet::shared_ptr frameset)
@@ -61,7 +61,7 @@ void Demux::handle(framing::FrameSet::shared_ptr frameset)
}
}
if (!matched) {
- defaultQueue.push(frameset);
+ defaultQueue->push(frameset);
}
}
@@ -71,17 +71,17 @@ void Demux::close()
for (iterator i = records.begin(); i != records.end(); i++) {
i->queue->close();
}
- defaultQueue.close();
+ defaultQueue->close();
}
-Demux::Queue& Demux::add(const std::string& name, Condition condition)
+Demux::QueuePtr Demux::add(const std::string& name, Condition condition)
{
sys::Mutex::ScopedLock l(lock);
iterator i = std::find_if(records.begin(), records.end(), Find(name));
if (i == records.end()) {
Record r(name, condition);
records.push_back(r);
- return *(r.queue);
+ return r.queue;
} else {
throw Exception("Queue already exists for " + name);
}
@@ -93,17 +93,17 @@ void Demux::remove(const std::string& name)
records.remove_if(Find(name));
}
-Demux::Queue& Demux::get(const std::string& name)
+Demux::QueuePtr Demux::get(const std::string& name)
{
sys::Mutex::ScopedLock l(lock);
iterator i = std::find_if(records.begin(), records.end(), Find(name));
if (i == records.end()) {
throw Exception("No queue for " + name);
}
- return *(i->queue);
+ return i->queue;
}
-Demux::Queue& Demux::getDefault()
+Demux::QueuePtr Demux::getDefault()
{
return defaultQueue;
}
diff --git a/cpp/src/qpid/client/Demux.h b/cpp/src/qpid/client/Demux.h
index 3c9d4a4857..5aaf75db44 100644
--- a/cpp/src/qpid/client/Demux.h
+++ b/cpp/src/qpid/client/Demux.h
@@ -45,16 +45,19 @@ class Demux
public:
typedef boost::function<bool(const framing::FrameSet&)> Condition;
typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue;
+ typedef boost::shared_ptr<Queue> QueuePtr;
+ Demux() : defaultQueue(new Queue()) {}
+
void handle(framing::FrameSet::shared_ptr);
void close();
- Queue& add(const std::string& name, Condition);
+ QueuePtr add(const std::string& name, Condition);
void remove(const std::string& name);
- Queue& get(const std::string& name);
- Queue& getDefault();
+ QueuePtr get(const std::string& name);
+ QueuePtr getDefault();
+
private:
- typedef boost::shared_ptr<Queue> QueuePtr;
struct Record
{
const std::string name;
@@ -66,7 +69,7 @@ private:
sys::Mutex lock;
std::list<Record> records;
- Queue defaultQueue;
+ QueuePtr defaultQueue;
typedef std::list<Record>::iterator iterator;
@@ -82,15 +85,14 @@ class ScopedDivert
{
const std::string dest;
Demux& demuxer;
- Demux::Queue* queue;
+ Demux::QueuePtr queue;
public:
ScopedDivert(const std::string& dest, Demux& demuxer);
~ScopedDivert();
- Demux::Queue& getQueue();
+ Demux::QueuePtr getQueue();
};
-}
-}
+}} // namespace qpid::client
#endif
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 37f3941022..65756f6404 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -62,14 +62,14 @@ void Dispatcher::start()
void Dispatcher::run()
{
- sys::BlockingQueue<FrameSet::shared_ptr>& q = queue.empty() ?
+ Demux::QueuePtr q = queue.empty() ?
session.execution().getDemux().getDefault() :
session.execution().getDemux().get(queue);
startRunning();
stopped = false;
while (!isStopped()) {
- FrameSet::shared_ptr content = q.pop();
+ FrameSet::shared_ptr content = q->pop();
if (content->isA<MessageTransferBody>()) {
Message msg(*content);
Subscriber::shared_ptr listener = find(msg.getDestination());
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
new file mode 100644
index 0000000000..048b4c7b4d
--- /dev/null
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -0,0 +1,52 @@
+#ifndef QPID_CLIENT_LOCALQUEUE_H
+#define QPID_CLIENT_LOCALQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Message.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/BlockingQueue.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * Local representation of a remote queue.
+ */
+class LocalQueue
+{
+ public:
+ LocalQueue(BlockingQueue& q) : queue(q) {}
+ ~LocalQueue();
+
+ /** Pop the next message off the queue.
+ *@exception ClosedException if subscription has been closed.
+ */
+ Message pop() { reurn queue->pop(); }
+
+ private:
+ BlockingQueue& queue;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_LOCALQUEUE_H*/
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index f7f0f52dba..02ea57fecc 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -162,7 +162,7 @@ bool SessionCore::isSync() { // user thread
FrameSet::shared_ptr SessionCore::get() { // user thread
// No lock here: pop does a blocking wait.
- return l3.getDemux().getDefault().pop();
+ return l3.getDemux().getDefault()->pop();
}
void SessionCore::open(uint32_t detachedLifetime) { // user thread
diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h
index 65196dbd9c..c53949ad6f 100644
--- a/cpp/src/qpid/sys/BlockingQueue.h
+++ b/cpp/src/qpid/sys/BlockingQueue.h
@@ -29,8 +29,6 @@
namespace qpid {
namespace sys {
-struct QueueClosed {};
-
template <class T>
class BlockingQueue
{
@@ -46,7 +44,7 @@ public:
T pop()
{
Waitable::ScopedLock l(lock);
- if (!queueWait()) throw QueueClosed();
+ if (!queueWait()) throw ClosedException();
return popInternal();
}
@@ -78,7 +76,7 @@ public:
}
/**
- * Close the queue. Throws QueueClosed in threads waiting in pop().
+ * Close the queue. Throws ClosedException in threads waiting in pop().
* Blocks till all waiting threads have been notified.
*/
void close()
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index 3f6ff0936e..c893e6906a 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -83,7 +83,7 @@ class InProcessConnector :
QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
}
}
- catch (const sys::QueueClosed&) {
+ catch (const ClosedException&) {
return;
}
}