summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-12-07 19:13:09 +0000
committerAlan Conway <aconway@apache.org>2007-12-07 19:13:09 +0000
commit7bc8f20e59e8f18926119a4bc5fdb5be262c500c (patch)
tree5112c5428872273dd26092cb5c8bdc3af3beb00e /cpp/src
parent237c3437a5a4b68c483af77c5d1346104ca404a0 (diff)
downloadqpid-python-7bc8f20e59e8f18926119a4bc5fdb5be262c500c.tar.gz
Summary:
- Replaced InProcessBroker with BrokerFixture, uses a full loopback broker for more realistic tests. - Extracted non-generated parts of Session_0_10 into SessionBase. - Sundry small fixes. src/tests/BrokerFixture.h - in process broker with loopback connections. - tests can force a disorderly disconnect. src/qpid/client/Connector.h - back door to private members for BrokerFixture. - close() in destructor to avoid leaks. src/qpid/client/ConnectionImpl.h,cpp: - close() in destructor, to fix hang when destroyed without being closed. src/qpid/client/CompletionTracker.h,.cpp: - Fixed race in close/add. src/qpid/client/SessionBase.h,cpp: - Extracted all non-generated code from Session_0_10 into SessionBase - Added sync() src/tests/exception_test.cpp: Converted to boost & BrokerFixture src/tests/ClientChannelTest.cpp, ClientSessionTest.cpp: Use BrokerFixture git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@602182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/qpid/client/Channel.cpp4
-rw-r--r--cpp/src/qpid/client/CompletionTracker.cpp9
-rw-r--r--cpp/src/qpid/client/CompletionTracker.h3
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp2
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h2
-rw-r--r--cpp/src/qpid/client/Connector.cpp4
-rw-r--r--cpp/src/qpid/client/Connector.h2
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp4
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h1
-rw-r--r--cpp/src/qpid/client/Message.h4
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp37
-rw-r--r--cpp/src/qpid/client/SessionBase.h101
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp6
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp19
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h30
-rw-r--r--cpp/src/tests/BrokerFixture.h101
-rw-r--r--cpp/src/tests/ClientChannelTest.cpp8
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp44
-rw-r--r--cpp/src/tests/exception_test.cpp112
-rw-r--r--cpp/src/tests/perftest.cpp12
22 files changed, 380 insertions, 130 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index e87c2de35b..9a025cdb44 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -208,7 +208,7 @@ libqpidbroker_la_SOURCES = \
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
$(rgen_client_cpp) \
- qpid/client/Session.h \
+ qpid/client/SessionBase.cpp \
qpid/client/Connection.cpp \
qpid/client/Channel.cpp \
qpid/client/Exchange.cpp \
@@ -335,6 +335,7 @@ nobase_include_HEADERS = \
qpid/client/MessageListener.h \
qpid/client/MessageQueue.h \
qpid/client/Response.h \
+ qpid/client/SessionBase.h \
qpid/client/Session.h \
qpid/client/SessionCore.h \
qpid/client/StateManager.h \
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index 3551946d1e..a6875fcb63 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -174,7 +174,7 @@ void Channel::cancel(const std::string& tag, bool synch) {
bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
string tag = "get-handler";
- ScopedDivert handler(tag, session.execution().getDemux());
+ ScopedDivert handler(tag, session.getExecution().getDemux());
Demux::QueuePtr incoming = handler.getQueue();
session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
@@ -243,7 +243,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination)
bool send = i->second.ackMode == AUTO_ACK
|| (prefetch && ++(i->second.count) > (prefetch / 2));
if (send) i->second.count = 0;
- session.execution().completed(content.getId(), true, send);
+ session.getExecution().completed(content.getId(), true, send);
}
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp
index 46a7384ac2..4c6e59f1b8 100644
--- a/cpp/src/qpid/client/CompletionTracker.cpp
+++ b/cpp/src/qpid/client/CompletionTracker.cpp
@@ -31,12 +31,13 @@ namespace
const std::string empty;
}
-CompletionTracker::CompletionTracker() {}
+CompletionTracker::CompletionTracker() : closed(false) {}
CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
void CompletionTracker::close()
{
sys::Mutex::ScopedLock l(lock);
+ closed=true;
while (!listeners.empty()) {
Record r(listeners.front());
{
@@ -47,17 +48,18 @@ void CompletionTracker::close()
}
}
+
void CompletionTracker::completed(const SequenceNumber& _mark)
{
sys::Mutex::ScopedLock l(lock);
mark = _mark;
while (!listeners.empty() && !(listeners.front().id > mark)) {
Record r(listeners.front());
+ listeners.pop_front();
{
sys::Mutex::ScopedUnlock u(lock);
r.completed();
}
- listeners.pop_front();
}
}
@@ -88,14 +90,13 @@ void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListe
bool CompletionTracker::add(const Record& record)
{
sys::Mutex::ScopedLock l(lock);
- if (record.id < mark) {
+ if (record.id < mark || closed) {
return false;
} else {
//insert at the correct position
Listeners::iterator i = seek(record.id);
if (i == listeners.end()) i = listeners.begin();
listeners.insert(i, record);
-
return true;
}
}
diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h
index b2697f399f..55f7ff7531 100644
--- a/cpp/src/qpid/client/CompletionTracker.h
+++ b/cpp/src/qpid/client/CompletionTracker.h
@@ -60,7 +60,8 @@ private:
};
typedef std::list<Record> Listeners;
-
+ bool closed;
+
sys::Mutex lock;
framing::SequenceNumber mark;
Listeners listeners;
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 1ae8a25d6b..cf00b2b296 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -44,6 +44,8 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
connector->setShutdownHandler(this);
}
+ConnectionImpl::~ConnectionImpl() { close(); }
+
void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index 46bd5b685d..6cc20a15f6 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -62,6 +62,8 @@ class ConnectionImpl : public framing::FrameHandler,
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
ConnectionImpl(boost::shared_ptr<Connector> c);
+ ~ConnectionImpl();
+
void addSession(const boost::shared_ptr<SessionCore>&);
void open(const std::string& host, int port = 5672,
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index f32d0470ec..23d2c3ff8d 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -51,7 +51,8 @@ Connector::Connector(
{
}
-Connector::~Connector(){
+Connector::~Connector() {
+ close();
if (receiver.id() && receiver.id() != Thread::current().id())
receiver.join();
}
@@ -76,7 +77,6 @@ void Connector::init(){
receiver = Thread(this);
}
-// Call with closedLock held
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
if (!closed) {
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index af6badd6e0..946289efdc 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -88,6 +88,8 @@ class Connector : public framing::OutputHandler,
void eof(qpid::sys::AsynchIO&);
friend class Channel;
+ friend class TestConnector;
+
public:
Connector(framing::ProtocolVersion pVersion,
bool debug = false, uint32_t buffer_size = 1024);
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 7ffcd676a3..6b6a76b222 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -52,8 +52,8 @@ Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
: session(s), running(false)
{
queue = q.empty() ?
- session.execution().getDemux().getDefault() :
- session.execution().getDemux().get(q);
+ session.getExecution().getDemux().getDefault() :
+ session.getExecution().getDemux().get(q);
}
void Dispatcher::start()
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 7b8fb8d01f..f30a35aad0 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -167,6 +167,8 @@ void ExecutionHandler::sendCompletion()
out(frame);
}
+SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; }
+
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener)
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 0999540909..6286ccd591 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -80,6 +80,7 @@ public:
framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
ResultListener=ResultListener());
+ framing::SequenceNumber lastSent() const;
void sendSyncRequest();
void sendFlushRequest();
void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index 081384974a..86404ac792 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -65,12 +65,12 @@ public:
void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const
{
- session.execution().completed(id, cumulative, send);
+ session.getExecution().completed(id, cumulative, send);
}
void acknowledge(bool cumulative = true, bool send = true) const
{
- const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send);
+ const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send);
}
/**@internal for incoming messages */
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
new file mode 100644
index 0000000000..06266ded91
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase.cpp
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionBase.h"
+
+namespace qpid {
+namespace client {
+using namespace framing;
+
+SessionBase::SessionBase() {}
+SessionBase::~SessionBase() {}
+SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {}
+void SessionBase::suspend() { impl->suspend(); }
+void SessionBase::close() { impl->close(); }
+void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); }
+bool SessionBase::isSynchronous() const { return impl->isSync(); }
+Execution& SessionBase::getExecution() { return impl->getExecution(); }
+Uuid SessionBase::getId() const { return impl->getId(); }
+framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
new file mode 100644
index 0000000000..890dbd269b
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase.h
@@ -0,0 +1,101 @@
+#ifndef QPID_CLIENT_SESSIONBASE_H
+#define QPID_CLIENT_SESSIONBASE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Response.h"
+#include "qpid/client/SessionCore.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::MethodContent;
+using framing::SequenceNumberSet;
+using framing::Uuid;
+
+/**
+ * Basic session operations that are not derived from AMQP XML methods.
+ */
+class SessionBase
+{
+ public:
+ SessionBase();
+ ~SessionBase();
+
+ /** Get the next message frame-set from the session. */
+ framing::FrameSet::shared_ptr get();
+
+ /** Get the session ID */
+ Uuid getId() const;
+
+ /**
+ * In synchronous mode, the session sets the sync bit on every
+ * command and waits for the broker's response before returning.
+ * Note this gives lower throughput than non-synchronous mode.
+ *
+ * In non-synchronous mode commands are sent without waiting
+ * for a respose (you can use the returned Completion object
+ * to wait for completion.)
+ *
+ *@param if true set the session to synchronous mode, else
+ * set it to non-synchronous mode.
+ */
+ void setSynchronous(bool isSync);
+
+ bool isSynchronous() const;
+
+ /**
+ * Suspend the session, can be resumed on a different connection.
+ * @see Connection::resume()
+ */
+ void suspend();
+
+ /** Close the session */
+ void close();
+
+ Execution& getExecution();
+
+ typedef framing::TransferContent DefaultContent;
+
+ protected:
+ shared_ptr<SessionCore> impl;
+ framing::ProtocolVersion version;
+ friend class Connection;
+ SessionBase(shared_ptr<SessionCore>);
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSIONBASE_H*/
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index ee9e9570ed..07a791bef3 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -203,8 +203,10 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
// user thread
{
Lock l(state);
- if (state==OPEN)
- doSuspend(REPLY_SUCCESS, OK);
+ if (state==SUSPENDED) { // Clear error that caused suspend
+ code=REPLY_SUCCESS;
+ text=OK;
+ }
check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION);
SequenceNumber sendAck=session->resuming();
attaching(c);
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index a758dc1341..ec2f7000ef 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -38,29 +38,30 @@ SubscriptionManager::SubscriptionManager(Session_0_10& s)
confirmMode(true), acquireMode(false)
{}
-void SubscriptionManager::subscribeInternal(
+Completion SubscriptionManager::subscribeInternal(
const std::string& q, const std::string& dest)
{
- session.messageSubscribe(arg::queue=q, arg::destination=dest,
+ Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest,
arg::confirmMode=confirmMode, arg::acquireMode=acquireMode);
setFlowControl(dest, messages, bytes, window);
+ return c;
}
-void SubscriptionManager::subscribe(
+Completion SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
dispatcher.listen(dest, &listener, autoAck);
- subscribeInternal(q, dest);
+ return subscribeInternal(q, dest);
}
-void SubscriptionManager::subscribe(
+Completion SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
lq.session=session;
- lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest));
- subscribeInternal(q, dest);
+ lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
+ return subscribeInternal(q, dest);
}
void SubscriptionManager::setFlowControl(
@@ -91,7 +92,9 @@ void SubscriptionManager::cancel(const std::string dest)
session.messageCancel(dest);
}
-void SubscriptionManager::run(bool autoStop)
+void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
+
+void SubscriptionManager::run()
{
dispatcher.setAutoStop(autoStop);
dispatcher.run();
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index c9066fe1de..1205478bf8 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -23,21 +23,24 @@
*/
#include "qpid/sys/Mutex.h"
#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Completion.h>
#include <qpid/client/Session_0_10.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
+#include <qpid/sys/Runnable.h>
+
#include <set>
#include <sstream>
namespace qpid {
namespace client {
-class SubscriptionManager
+class SubscriptionManager : public sys::Runnable
{
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- void subscribeInternal(const std::string& q, const std::string& dest);
+ Completion subscribeInternal(const std::string& q, const std::string& dest);
qpid::client::Dispatcher dispatcher;
qpid::client::Session_0_10& session;
@@ -47,8 +50,9 @@ class SubscriptionManager
AckPolicy autoAck;
bool confirmMode;
bool acquireMode;
-
-public:
+ bool autoStop;
+
+ public:
SubscriptionManager(Session_0_10& session);
/**
@@ -59,9 +63,9 @@ public:
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const std::string& tag=std::string());
+ Completion subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& tag=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -70,17 +74,21 @@ public:
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
+ Completion subscribe(LocalQueue& localQueue,
const std::string& queue,
const std::string& tag=std::string());
/** Cancel a subscription. */
void cancel(const std::string tag);
- /** Deliver messages until stop() is called.
- *@param autoStop If true, return when all listeners are cancelled.
+ /** Deliver messages until stop() is called. */
+ void run();
+
+ /** If set true, run() will stop when all subscriptions
+ * are cancelled. If false, run will only stop when stop()
+ * is called. True by default.
*/
- void run(bool autoStop=true);
+ void setAutoStop(bool set=true);
/** Cause run() to return */
void stop();
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
new file mode 100644
index 0000000000..ceb8ecb4e6
--- /dev/null
+++ b/cpp/src/tests/BrokerFixture.h
@@ -0,0 +1,101 @@
+#ifndef TESTS_BROKERFIXTURE_H
+#define TESTS_BROKERFIXTURE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Thread.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
+
+namespace qpid { namespace client {
+/** Back door into private Connector stuff */
+struct TestConnector {
+ static void disconnect(qpid::client::Connector& c) {
+ c.socket.close();
+ c.handleClosed();
+ }
+};
+}}
+
+/**
+ * A fixture to create an in-process broker and connect to it for tests.
+ */
+struct BrokerFixture {
+ typedef qpid::broker::Broker Broker;
+ typedef boost::shared_ptr<Broker> BrokerPtr;
+
+ struct OpenConnection : public qpid::client::Connection {
+ OpenConnection(int port) { open("localhost", port); }
+ };
+
+ BrokerPtr broker;
+ qpid::sys::Thread brokerThread;
+ OpenConnection connection;
+ qpid::client::Session_0_10 session;
+ qpid::client::SubscriptionManager subs;
+ qpid::client::LocalQueue lq;
+
+ BrokerPtr newBroker() {
+ Broker::Options opts;
+ opts.port=0;
+ opts.workerThreads=1;
+ BrokerPtr b=Broker::create(opts);
+ // TODO aconway 2007-12-05: Without the following line
+ // the test can hang in the connection ctor. This is
+ // a race condition that should be fixed.
+ b->getPort();
+ return b;
+ };
+
+ BrokerFixture() : broker(newBroker()),
+ brokerThread(*broker),
+ connection(broker->getPort()),
+ session(connection.newSession()),
+ subs(session)
+ {}
+
+ ~BrokerFixture() {
+ connection.close();
+ broker->shutdown();
+ brokerThread.join();
+ }
+
+ /** Open a connection to the local broker */
+ void open(qpid::client::Connection& c) {
+ c.open("localhost", broker->getPort());
+ }
+
+ /** Close a connection's socket */
+ static void disconnect(qpid::client::Connection& c) {
+ struct Expose : public qpid::client::Connection {
+ void disconnect() {
+ qpid::client::TestConnector::disconnect(*impl->getConnector());
+ }
+ };
+ static_cast<Expose&>(c).disconnect();
+ }
+};
+
+#endif /*!TESTS_BROKERFIXTURE_H*/
diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp
index 454b9ca56d..bd8f5af6be 100644
--- a/cpp/src/tests/ClientChannelTest.cpp
+++ b/cpp/src/tests/ClientChannelTest.cpp
@@ -20,7 +20,7 @@
*/
#include <vector>
#include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Message.h"
#include "qpid/client/Queue.h"
@@ -44,7 +44,7 @@ const size_t FRAME_MAX = 256;
* The test base defines the tests methods, derived classes
* instantiate the channel in Basic or Message mode.
*/
-class ChannelTestBase : public CppUnit::TestCase
+class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture
{
struct Listener: public qpid::client::MessageListener {
vector<Message> messages;
@@ -56,7 +56,6 @@ class ChannelTestBase : public CppUnit::TestCase
}
};
- qpid::InProcessBrokerClient connection;
const std::string qname;
const std::string data;
Queue queue;
@@ -69,8 +68,7 @@ class ChannelTestBase : public CppUnit::TestCase
public:
ChannelTestBase()
- : connection(FRAME_MAX),
- qname("testq"), data("hello"),
+ : qname("testq"), data("hello"),
queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
{}
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index ac2cf155f4..70e8a41074 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -19,7 +19,7 @@
*
*/
#include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
#include "qpid/client/Dispatcher.h"
#include "qpid/client/Session_0_10.h"
#include "qpid/framing/TransferContent.h"
@@ -61,7 +61,7 @@ struct DummyListener : public MessageListener
}
};
-class ClientSessionTest : public CppUnit::TestCase
+class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
{
CPPUNIT_TEST_SUITE(ClientSessionTest);
CPPUNIT_TEST(testQueueQuery);
@@ -74,23 +74,14 @@ class ClientSessionTest : public CppUnit::TestCase
CPPUNIT_TEST_SUITE_END();
shared_ptr<broker::Broker> broker;
- Session_0_10 session;
- // Defer construction & thread creation to setUp
- boost::optional<InProcessConnection> c;
- boost::optional<InProcessConnection> c2;
-public:
+ public:
void setUp() {
broker = broker::Broker::create();
- c=boost::in_place<InProcessConnection>(broker);
- c2=boost::in_place<InProcessConnection>(broker);
}
void tearDown() {
- c2.reset();
- c.reset();
- broker.reset();
}
void declareSubscribe(const std::string& q="my-queue",
@@ -109,7 +100,7 @@ public:
void testQueueQuery()
{
- session = c->newSession();
+ session =connection.newSession();
session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
TypedResult<QueueQueryResult> result = session.queueQuery(std::string("my-queue"));
CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
@@ -120,7 +111,7 @@ public:
void testTransfer()
{
- session = c->newSession();
+ session =connection.newSession();
declareSubscribe();
session.messageTransfer(content=TransferContent("my-message", "my-queue"));
//get & test the message:
@@ -128,12 +119,12 @@ public:
CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent());
//confirm receipt:
- session.execution().completed(msg->getId(), true, true);
+ session.getExecution().completed(msg->getId(), true, true);
}
void testDispatcher()
{
- session = c->newSession();
+ session =connection.newSession();
declareSubscribe();
TransferContent msg1("One");
@@ -161,16 +152,16 @@ public:
}
void testResumeExpiredError() {
- session = c->newSession(0);
+ session =connection.newSession(0);
session.suspend(); // session has 0 timeout.
try {
- c->resume(session);
+ connection.resume(session);
CPPUNIT_FAIL("Expected InvalidArgumentException.");
} catch(const InternalErrorException&) {}
}
void testUseSuspendedError() {
- session = c->newSession(60);
+ session =connection.newSession(60);
session.suspend();
try {
session.exchangeQuery(name="amq.fanout");
@@ -179,26 +170,27 @@ public:
}
void testSuspendResume() {
- session = c->newSession(60);
+ session =connection.newSession(60);
declareSubscribe();
session.suspend();
// Make sure we are still subscribed after resume.
- c->resume(session);
+ connection.resume(session);
session.messageTransfer(content=TransferContent("my-message", "my-queue"));
FrameSet::shared_ptr msg = session.get();
CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
}
void testDisconnectResume() {
- session = c->newSession(60);
+ session =connection.newSession(60);
session.queueDeclare(queue="before");
CPPUNIT_ASSERT(queueExists("before"));
- // Simulate lost frames.
- c->discard();
session.queueDeclare(queue=string("after"));
- c->disconnect(); // Simulate disconnect, resume on a new connection.
- c2->resume(session);
+ disconnect(connection);
+ Connection c2;
+ open(c2);
+ c2.resume(session);
CPPUNIT_ASSERT(queueExists("after"));
+ c2.close();
}
};
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index d7094c9bfd..e774c48070 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -20,7 +20,7 @@
*/
#include "unit_test.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -31,74 +31,74 @@ QPID_AUTO_TEST_SUITE(exception_test)
using namespace std;
using namespace qpid;
+using namespace sys;
using namespace client;
using namespace framing;
-struct Fixture {
- InProcessConnection connection;
- InProcessConnection connection2;
- Session_0_10 session;
- SubscriptionManager sub;
- LocalQueue q;
+using boost::bind;
+using boost::function;
- Fixture() : connection(),
- connection2(connection.getBroker()),
- session(connection.newSession()),
- sub(session)
- {
- session.queueDeclare(arg::queue="q");
+template <class Ex>
+struct Catcher : public Runnable {
+ function<void ()> f;
+ bool caught;
+ Thread thread;
+
+ Catcher(function<void ()> f_) : f(f_), caught(false), thread(this) {}
+ ~Catcher() { join(); }
+
+ void run() {
+ try { f(); }
+ catch(const Ex& e) {
+ caught=true;
+ BOOST_MESSAGE(e.what());
+ }
+ catch(const std::exception& e) {
+ BOOST_ERROR(string("Bad exception: ")+e.what());
+ }
+ catch(...) {
+ BOOST_ERROR(string("Bad exception: unknown"));
+ }
+ }
+
+ bool join() {
+ if (thread.id()) {
+ thread.join();
+ thread=Thread();
+ }
+ return caught;
}
};
+BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) {
+ Catcher<ClosedException> get(bind(&Session_0_10::get, session));
+ disconnect(connection);
+ BOOST_CHECK(get.join());
+}
-// TODO aconway 2007-11-30: need InProcessBroker to be a more accurate
-// simulation of shutdown behaviour. It should override only
-// Connector.run() to substitute NetworkQueues for the Dispatcher.
-//
-// template <class Ex>
-// struct Catcher : public sys::Runnable {
-// Session_0_10 s;
-// boost::function<void ()> f;
-// bool caught;
-// Catcher(Session_0_10 s_, boost::function<void ()> f_)
-// : s(s_), f(f_), caught(false) {}
-// void run() {
-// try { f(); } catch(const Ex& e) {
-// caught=true;
-// BOOST_MESSAGE(e.what());
-// }
-// }
-// };
+BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) {
+ session.queueDeclare(arg::queue="q");
+ subs.subscribe(lq, "q");
+ Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
+ disconnect(connection);
+ BOOST_CHECK(pop.join());
+}
-// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, Fixture) {
-// Catcher<Exception> get(session, boost::bind(&Session_0_10::get, session));
-// sub.subscribe(q, "q");
-// sys::Thread t(get);
-// connection.disconnect();
-// t.join();
-// BOOST_CHECK(get.caught);
-// }
-
-// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, Fixture) {
+// FIXME aconway 2007-12-07: This test hangs sporadically at t.join
+// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) {
// struct NullListener : public MessageListener {
// void received(Message&) { BOOST_FAIL("Unexpected message"); }
// } l;
-// sub.subscribe(l, "q");
-// connection.disconnect();
-// try {
-// sub.run();
-// BOOST_FAIL("Expected exception");
-// } catch (const Exception&e) { BOOST_FAIL(e.what()); }
-// try {
-// session.queueDeclare(arg::queue="foo");
-// BOOST_FAIL("Expected exception");
-// } catch (const Exception&e) { BOOST_FAIL(e.what()); }
+// session.queueDeclare(arg::queue="q");
+// subs.subscribe(l, "q");
+// Thread t(subs);
+// disconnect(connection);
+// t.join();
+// BOOST_CHECK_THROW(session.close(), InternalErrorException);
// }
-// TODO aconway 2007-11-30: setSynchronous appears not to work.
-// BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, Fixture) {
-// session.setSynchronous(true);
-// BOOST_CHECK_THROW(sub.subscribe(q, "no such queue"), NotFoundException);
-// }
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) {
+ BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException);
+}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index bd2c0d9933..d6185a0100 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -205,7 +205,7 @@ struct Setup : public Client {
}
}
// Make sure this is all completed before we return.
- session.execution().sendSyncRequest();
+ session.getExecution().sendSyncRequest();
}
};
@@ -231,13 +231,9 @@ class Stats {
// Functor to collect rates.
void operator()(const string& data) {
- try {
- double d=lexical_cast<double>(data);
- values.push_back(d);
- sum += d;
- } catch (...) {
- throw Exception(QPID_MSG("Bad data, expecting double: " << data));
- }
+ double d=lexical_cast<double>(data);
+ values.push_back(d);
+ sum += d;
}
double mean() const {