summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-12-07 19:13:09 +0000
committerAlan Conway <aconway@apache.org>2007-12-07 19:13:09 +0000
commit7bc8f20e59e8f18926119a4bc5fdb5be262c500c (patch)
tree5112c5428872273dd26092cb5c8bdc3af3beb00e /cpp/src/qpid
parent237c3437a5a4b68c483af77c5d1346104ca404a0 (diff)
downloadqpid-python-7bc8f20e59e8f18926119a4bc5fdb5be262c500c.tar.gz
Summary:
- Replaced InProcessBroker with BrokerFixture, uses a full loopback broker for more realistic tests. - Extracted non-generated parts of Session_0_10 into SessionBase. - Sundry small fixes. src/tests/BrokerFixture.h - in process broker with loopback connections. - tests can force a disorderly disconnect. src/qpid/client/Connector.h - back door to private members for BrokerFixture. - close() in destructor to avoid leaks. src/qpid/client/ConnectionImpl.h,cpp: - close() in destructor, to fix hang when destroyed without being closed. src/qpid/client/CompletionTracker.h,.cpp: - Fixed race in close/add. src/qpid/client/SessionBase.h,cpp: - Extracted all non-generated code from Session_0_10 into SessionBase - Added sync() src/tests/exception_test.cpp: Converted to boost & BrokerFixture src/tests/ClientChannelTest.cpp, ClientSessionTest.cpp: Use BrokerFixture git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@602182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/client/Channel.cpp4
-rw-r--r--cpp/src/qpid/client/CompletionTracker.cpp9
-rw-r--r--cpp/src/qpid/client/CompletionTracker.h3
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp2
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h2
-rw-r--r--cpp/src/qpid/client/Connector.cpp4
-rw-r--r--cpp/src/qpid/client/Connector.h2
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp4
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h1
-rw-r--r--cpp/src/qpid/client/Message.h4
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp37
-rw-r--r--cpp/src/qpid/client/SessionBase.h101
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp6
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp19
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h30
16 files changed, 196 insertions, 34 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index 3551946d1e..a6875fcb63 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -174,7 +174,7 @@ void Channel::cancel(const std::string& tag, bool synch) {
bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
string tag = "get-handler";
- ScopedDivert handler(tag, session.execution().getDemux());
+ ScopedDivert handler(tag, session.getExecution().getDemux());
Demux::QueuePtr incoming = handler.getQueue();
session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
@@ -243,7 +243,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination)
bool send = i->second.ackMode == AUTO_ACK
|| (prefetch && ++(i->second.count) > (prefetch / 2));
if (send) i->second.count = 0;
- session.execution().completed(content.getId(), true, send);
+ session.getExecution().completed(content.getId(), true, send);
}
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp
index 46a7384ac2..4c6e59f1b8 100644
--- a/cpp/src/qpid/client/CompletionTracker.cpp
+++ b/cpp/src/qpid/client/CompletionTracker.cpp
@@ -31,12 +31,13 @@ namespace
const std::string empty;
}
-CompletionTracker::CompletionTracker() {}
+CompletionTracker::CompletionTracker() : closed(false) {}
CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
void CompletionTracker::close()
{
sys::Mutex::ScopedLock l(lock);
+ closed=true;
while (!listeners.empty()) {
Record r(listeners.front());
{
@@ -47,17 +48,18 @@ void CompletionTracker::close()
}
}
+
void CompletionTracker::completed(const SequenceNumber& _mark)
{
sys::Mutex::ScopedLock l(lock);
mark = _mark;
while (!listeners.empty() && !(listeners.front().id > mark)) {
Record r(listeners.front());
+ listeners.pop_front();
{
sys::Mutex::ScopedUnlock u(lock);
r.completed();
}
- listeners.pop_front();
}
}
@@ -88,14 +90,13 @@ void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListe
bool CompletionTracker::add(const Record& record)
{
sys::Mutex::ScopedLock l(lock);
- if (record.id < mark) {
+ if (record.id < mark || closed) {
return false;
} else {
//insert at the correct position
Listeners::iterator i = seek(record.id);
if (i == listeners.end()) i = listeners.begin();
listeners.insert(i, record);
-
return true;
}
}
diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h
index b2697f399f..55f7ff7531 100644
--- a/cpp/src/qpid/client/CompletionTracker.h
+++ b/cpp/src/qpid/client/CompletionTracker.h
@@ -60,7 +60,8 @@ private:
};
typedef std::list<Record> Listeners;
-
+ bool closed;
+
sys::Mutex lock;
framing::SequenceNumber mark;
Listeners listeners;
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 1ae8a25d6b..cf00b2b296 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -44,6 +44,8 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
connector->setShutdownHandler(this);
}
+ConnectionImpl::~ConnectionImpl() { close(); }
+
void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index 46bd5b685d..6cc20a15f6 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -62,6 +62,8 @@ class ConnectionImpl : public framing::FrameHandler,
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
ConnectionImpl(boost::shared_ptr<Connector> c);
+ ~ConnectionImpl();
+
void addSession(const boost::shared_ptr<SessionCore>&);
void open(const std::string& host, int port = 5672,
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index f32d0470ec..23d2c3ff8d 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -51,7 +51,8 @@ Connector::Connector(
{
}
-Connector::~Connector(){
+Connector::~Connector() {
+ close();
if (receiver.id() && receiver.id() != Thread::current().id())
receiver.join();
}
@@ -76,7 +77,6 @@ void Connector::init(){
receiver = Thread(this);
}
-// Call with closedLock held
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
if (!closed) {
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index af6badd6e0..946289efdc 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -88,6 +88,8 @@ class Connector : public framing::OutputHandler,
void eof(qpid::sys::AsynchIO&);
friend class Channel;
+ friend class TestConnector;
+
public:
Connector(framing::ProtocolVersion pVersion,
bool debug = false, uint32_t buffer_size = 1024);
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 7ffcd676a3..6b6a76b222 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -52,8 +52,8 @@ Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
: session(s), running(false)
{
queue = q.empty() ?
- session.execution().getDemux().getDefault() :
- session.execution().getDemux().get(q);
+ session.getExecution().getDemux().getDefault() :
+ session.getExecution().getDemux().get(q);
}
void Dispatcher::start()
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 7b8fb8d01f..f30a35aad0 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -167,6 +167,8 @@ void ExecutionHandler::sendCompletion()
out(frame);
}
+SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; }
+
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener)
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 0999540909..6286ccd591 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -80,6 +80,7 @@ public:
framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
ResultListener=ResultListener());
+ framing::SequenceNumber lastSent() const;
void sendSyncRequest();
void sendFlushRequest();
void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index 081384974a..86404ac792 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -65,12 +65,12 @@ public:
void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const
{
- session.execution().completed(id, cumulative, send);
+ session.getExecution().completed(id, cumulative, send);
}
void acknowledge(bool cumulative = true, bool send = true) const
{
- const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send);
+ const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send);
}
/**@internal for incoming messages */
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
new file mode 100644
index 0000000000..06266ded91
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase.cpp
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionBase.h"
+
+namespace qpid {
+namespace client {
+using namespace framing;
+
+SessionBase::SessionBase() {}
+SessionBase::~SessionBase() {}
+SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {}
+void SessionBase::suspend() { impl->suspend(); }
+void SessionBase::close() { impl->close(); }
+void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); }
+bool SessionBase::isSynchronous() const { return impl->isSync(); }
+Execution& SessionBase::getExecution() { return impl->getExecution(); }
+Uuid SessionBase::getId() const { return impl->getId(); }
+framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
new file mode 100644
index 0000000000..890dbd269b
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase.h
@@ -0,0 +1,101 @@
+#ifndef QPID_CLIENT_SESSIONBASE_H
+#define QPID_CLIENT_SESSIONBASE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Response.h"
+#include "qpid/client/SessionCore.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::MethodContent;
+using framing::SequenceNumberSet;
+using framing::Uuid;
+
+/**
+ * Basic session operations that are not derived from AMQP XML methods.
+ */
+class SessionBase
+{
+ public:
+ SessionBase();
+ ~SessionBase();
+
+ /** Get the next message frame-set from the session. */
+ framing::FrameSet::shared_ptr get();
+
+ /** Get the session ID */
+ Uuid getId() const;
+
+ /**
+ * In synchronous mode, the session sets the sync bit on every
+ * command and waits for the broker's response before returning.
+ * Note this gives lower throughput than non-synchronous mode.
+ *
+ * In non-synchronous mode commands are sent without waiting
+ * for a respose (you can use the returned Completion object
+ * to wait for completion.)
+ *
+ *@param if true set the session to synchronous mode, else
+ * set it to non-synchronous mode.
+ */
+ void setSynchronous(bool isSync);
+
+ bool isSynchronous() const;
+
+ /**
+ * Suspend the session, can be resumed on a different connection.
+ * @see Connection::resume()
+ */
+ void suspend();
+
+ /** Close the session */
+ void close();
+
+ Execution& getExecution();
+
+ typedef framing::TransferContent DefaultContent;
+
+ protected:
+ shared_ptr<SessionCore> impl;
+ framing::ProtocolVersion version;
+ friend class Connection;
+ SessionBase(shared_ptr<SessionCore>);
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSIONBASE_H*/
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index ee9e9570ed..07a791bef3 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -203,8 +203,10 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
// user thread
{
Lock l(state);
- if (state==OPEN)
- doSuspend(REPLY_SUCCESS, OK);
+ if (state==SUSPENDED) { // Clear error that caused suspend
+ code=REPLY_SUCCESS;
+ text=OK;
+ }
check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION);
SequenceNumber sendAck=session->resuming();
attaching(c);
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index a758dc1341..ec2f7000ef 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -38,29 +38,30 @@ SubscriptionManager::SubscriptionManager(Session_0_10& s)
confirmMode(true), acquireMode(false)
{}
-void SubscriptionManager::subscribeInternal(
+Completion SubscriptionManager::subscribeInternal(
const std::string& q, const std::string& dest)
{
- session.messageSubscribe(arg::queue=q, arg::destination=dest,
+ Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest,
arg::confirmMode=confirmMode, arg::acquireMode=acquireMode);
setFlowControl(dest, messages, bytes, window);
+ return c;
}
-void SubscriptionManager::subscribe(
+Completion SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
dispatcher.listen(dest, &listener, autoAck);
- subscribeInternal(q, dest);
+ return subscribeInternal(q, dest);
}
-void SubscriptionManager::subscribe(
+Completion SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
lq.session=session;
- lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest));
- subscribeInternal(q, dest);
+ lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
+ return subscribeInternal(q, dest);
}
void SubscriptionManager::setFlowControl(
@@ -91,7 +92,9 @@ void SubscriptionManager::cancel(const std::string dest)
session.messageCancel(dest);
}
-void SubscriptionManager::run(bool autoStop)
+void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
+
+void SubscriptionManager::run()
{
dispatcher.setAutoStop(autoStop);
dispatcher.run();
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index c9066fe1de..1205478bf8 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -23,21 +23,24 @@
*/
#include "qpid/sys/Mutex.h"
#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Completion.h>
#include <qpid/client/Session_0_10.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
+#include <qpid/sys/Runnable.h>
+
#include <set>
#include <sstream>
namespace qpid {
namespace client {
-class SubscriptionManager
+class SubscriptionManager : public sys::Runnable
{
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- void subscribeInternal(const std::string& q, const std::string& dest);
+ Completion subscribeInternal(const std::string& q, const std::string& dest);
qpid::client::Dispatcher dispatcher;
qpid::client::Session_0_10& session;
@@ -47,8 +50,9 @@ class SubscriptionManager
AckPolicy autoAck;
bool confirmMode;
bool acquireMode;
-
-public:
+ bool autoStop;
+
+ public:
SubscriptionManager(Session_0_10& session);
/**
@@ -59,9 +63,9 @@ public:
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const std::string& tag=std::string());
+ Completion subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& tag=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -70,17 +74,21 @@ public:
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
+ Completion subscribe(LocalQueue& localQueue,
const std::string& queue,
const std::string& tag=std::string());
/** Cancel a subscription. */
void cancel(const std::string tag);
- /** Deliver messages until stop() is called.
- *@param autoStop If true, return when all listeners are cancelled.
+ /** Deliver messages until stop() is called. */
+ void run();
+
+ /** If set true, run() will stop when all subscriptions
+ * are cancelled. If false, run will only stop when stop()
+ * is called. True by default.
*/
- void run(bool autoStop=true);
+ void setAutoStop(bool set=true);
/** Cause run() to return */
void stop();