diff options
author | Alan Conway <aconway@apache.org> | 2007-12-07 19:13:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-12-07 19:13:09 +0000 |
commit | 7bc8f20e59e8f18926119a4bc5fdb5be262c500c (patch) | |
tree | 5112c5428872273dd26092cb5c8bdc3af3beb00e /cpp | |
parent | 237c3437a5a4b68c483af77c5d1346104ca404a0 (diff) | |
download | qpid-python-7bc8f20e59e8f18926119a4bc5fdb5be262c500c.tar.gz |
Summary:
- Replaced InProcessBroker with BrokerFixture, uses a full loopback
broker for more realistic tests.
- Extracted non-generated parts of Session_0_10 into SessionBase.
- Sundry small fixes.
src/tests/BrokerFixture.h
- in process broker with loopback connections.
- tests can force a disorderly disconnect.
src/qpid/client/Connector.h
- back door to private members for BrokerFixture.
- close() in destructor to avoid leaks.
src/qpid/client/ConnectionImpl.h,cpp:
- close() in destructor, to fix hang when destroyed without being closed.
src/qpid/client/CompletionTracker.h,.cpp:
- Fixed race in close/add.
src/qpid/client/SessionBase.h,cpp:
- Extracted all non-generated code from Session_0_10 into SessionBase
- Added sync()
src/tests/exception_test.cpp: Converted to boost & BrokerFixture
src/tests/ClientChannelTest.cpp, ClientSessionTest.cpp: Use BrokerFixture
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@602182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
23 files changed, 386 insertions, 183 deletions
diff --git a/cpp/rubygen/templates/Session.rb b/cpp/rubygen/templates/Session.rb index 167422d3a9..ff578b251c 100644 --- a/cpp/rubygen/templates/Session.rb +++ b/cpp/rubygen/templates/Session.rb @@ -68,19 +68,8 @@ class SessionNoKeywordGen < CppGen def generate() h_file(@file) { - include "qpid/framing/amqp_framing.h" - include "qpid/framing/Uuid.h" - include "qpid/framing/amqp_structs.h" - include "qpid/framing/ProtocolVersion.h" - include "qpid/framing/MethodContent.h" - include "qpid/framing/TransferContent.h" - include "qpid/client/Completion.h" - include "qpid/client/ConnectionImpl.h" - include "qpid/client/Response.h" - include "qpid/client/SessionCore.h" - include "qpid/client/TypedResult.h" - include "qpid/shared_ptr.h" - include "<string>" + include "qpid/client/SessionBase.h" + namespace("qpid::client") { genl "using std::string;" genl "using framing::Content;" @@ -94,59 +83,23 @@ class SessionNoKeywordGen < CppGen genl "AMQP #{@amqp.version} session API." genl @amqp.class_("session").doc } - cpp_class(@classname) { + cpp_class(@classname, "public SessionBase") { public - gen <<EOS -#{@classname}(); - -/** Get the next message frame-set from the session. */ -framing::FrameSet::shared_ptr get() { return impl->get(); } - -/** Get the session ID */ -Uuid getId() const { return impl->getId(); } - -/** @param sync if true all session methods block till a response arrives. */ -void setSynchronous(bool sync) { impl->setSync(sync); } - -/** Suspend the session, can be resumed on a different connection. - * @see Connection::resume() - */ -void suspend(); - -/** Close the session */ -void close(); - -Execution& execution() { return impl->getExecution(); } - -typedef framing::TransferContent DefaultContent; -EOS + genl "Session_0_10() {}" + genl "Session_0_10(shared_ptr<SessionCore> core) : SessionBase(core) {}" session_methods.each { |m| genl doxygen(m) args=m.sig_c_default.join(", ") genl "#{m.return_type} #{m.session_function}(#{args});" } - genl - protected - gen <<EOS -shared_ptr<SessionCore> impl; -framing::ProtocolVersion version; -friend class Connection; -#{@classname}(shared_ptr<SessionCore>); -EOS }}}} cpp_file(@file) { include @classname include "qpid/framing/all_method_bodies.h" namespace(@namespace) { - gen <<EOS -using namespace framing; -#{@classname}::#{@classname}() {} -#{@classname}::#{@classname}(shared_ptr<SessionCore> core) : impl(core) {} -void #{@classname}::suspend() { impl->suspend(); } -void #{@classname}::close() { impl->close(); } -EOS + genl "using namespace framing;" session_methods.each { |m| genl sig=m.signature_c.join(", ") diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index e87c2de35b..9a025cdb44 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -208,7 +208,7 @@ libqpidbroker_la_SOURCES = \ libqpidclient_la_LIBADD = libqpidcommon.la libqpidclient_la_SOURCES = \ $(rgen_client_cpp) \ - qpid/client/Session.h \ + qpid/client/SessionBase.cpp \ qpid/client/Connection.cpp \ qpid/client/Channel.cpp \ qpid/client/Exchange.cpp \ @@ -335,6 +335,7 @@ nobase_include_HEADERS = \ qpid/client/MessageListener.h \ qpid/client/MessageQueue.h \ qpid/client/Response.h \ + qpid/client/SessionBase.h \ qpid/client/Session.h \ qpid/client/SessionCore.h \ qpid/client/StateManager.h \ diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index 3551946d1e..a6875fcb63 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -174,7 +174,7 @@ void Channel::cancel(const std::string& tag, bool synch) { bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { string tag = "get-handler"; - ScopedDivert handler(tag, session.execution().getDemux()); + ScopedDivert handler(tag, session.getExecution().getDemux()); Demux::QueuePtr incoming = handler.getQueue(); session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); @@ -243,7 +243,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination) bool send = i->second.ackMode == AUTO_ACK || (prefetch && ++(i->second.count) > (prefetch / 2)); if (send) i->second.count = 0; - session.execution().completed(content.getId(), true, send); + session.getExecution().completed(content.getId(), true, send); } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp index 46a7384ac2..4c6e59f1b8 100644 --- a/cpp/src/qpid/client/CompletionTracker.cpp +++ b/cpp/src/qpid/client/CompletionTracker.cpp @@ -31,12 +31,13 @@ namespace const std::string empty; } -CompletionTracker::CompletionTracker() {} +CompletionTracker::CompletionTracker() : closed(false) {} CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} void CompletionTracker::close() { sys::Mutex::ScopedLock l(lock); + closed=true; while (!listeners.empty()) { Record r(listeners.front()); { @@ -47,17 +48,18 @@ void CompletionTracker::close() } } + void CompletionTracker::completed(const SequenceNumber& _mark) { sys::Mutex::ScopedLock l(lock); mark = _mark; while (!listeners.empty() && !(listeners.front().id > mark)) { Record r(listeners.front()); + listeners.pop_front(); { sys::Mutex::ScopedUnlock u(lock); r.completed(); } - listeners.pop_front(); } } @@ -88,14 +90,13 @@ void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListe bool CompletionTracker::add(const Record& record) { sys::Mutex::ScopedLock l(lock); - if (record.id < mark) { + if (record.id < mark || closed) { return false; } else { //insert at the correct position Listeners::iterator i = seek(record.id); if (i == listeners.end()) i = listeners.begin(); listeners.insert(i, record); - return true; } } diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h index b2697f399f..55f7ff7531 100644 --- a/cpp/src/qpid/client/CompletionTracker.h +++ b/cpp/src/qpid/client/CompletionTracker.h @@ -60,7 +60,8 @@ private: }; typedef std::list<Record> Listeners; - + bool closed; + sys::Mutex lock; framing::SequenceNumber mark; Listeners listeners; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 1ae8a25d6b..cf00b2b296 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -44,6 +44,8 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) connector->setShutdownHandler(this); } +ConnectionImpl::~ConnectionImpl() { close(); } + void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 46bd5b685d..6cc20a15f6 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -62,6 +62,8 @@ class ConnectionImpl : public framing::FrameHandler, typedef boost::shared_ptr<ConnectionImpl> shared_ptr; ConnectionImpl(boost::shared_ptr<Connector> c); + ~ConnectionImpl(); + void addSession(const boost::shared_ptr<SessionCore>&); void open(const std::string& host, int port = 5672, diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index f32d0470ec..23d2c3ff8d 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -51,7 +51,8 @@ Connector::Connector( { } -Connector::~Connector(){ +Connector::~Connector() { + close(); if (receiver.id() && receiver.id() != Thread::current().id()) receiver.join(); } @@ -76,7 +77,6 @@ void Connector::init(){ receiver = Thread(this); } -// Call with closedLock held bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); if (!closed) { diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index af6badd6e0..946289efdc 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -88,6 +88,8 @@ class Connector : public framing::OutputHandler, void eof(qpid::sys::AsynchIO&); friend class Channel; + friend class TestConnector; + public: Connector(framing::ProtocolVersion pVersion, bool debug = false, uint32_t buffer_size = 1024); diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 7ffcd676a3..6b6a76b222 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -52,8 +52,8 @@ Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), running(false) { queue = q.empty() ? - session.execution().getDemux().getDefault() : - session.execution().getDemux().get(q); + session.getExecution().getDemux().getDefault() : + session.getExecution().getDemux().get(q); } void Dispatcher::start() diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 7b8fb8d01f..f30a35aad0 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -167,6 +167,8 @@ void ExecutionHandler::sendCompletion() out(frame); } +SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; } + SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener) { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 0999540909..6286ccd591 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -80,6 +80,7 @@ public: framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener()); framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, ResultListener=ResultListener()); + framing::SequenceNumber lastSent() const; void sendSyncRequest(); void sendFlushRequest(); void completed(const framing::SequenceNumber& id, bool cumulative, bool send); diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index 081384974a..86404ac792 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -65,12 +65,12 @@ public: void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const { - session.execution().completed(id, cumulative, send); + session.getExecution().completed(id, cumulative, send); } void acknowledge(bool cumulative = true, bool send = true) const { - const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send); + const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send); } /**@internal for incoming messages */ diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp new file mode 100644 index 0000000000..06266ded91 --- /dev/null +++ b/cpp/src/qpid/client/SessionBase.cpp @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionBase.h" + +namespace qpid { +namespace client { +using namespace framing; + +SessionBase::SessionBase() {} +SessionBase::~SessionBase() {} +SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {} +void SessionBase::suspend() { impl->suspend(); } +void SessionBase::close() { impl->close(); } +void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); } +bool SessionBase::isSynchronous() const { return impl->isSync(); } +Execution& SessionBase::getExecution() { return impl->getExecution(); } +Uuid SessionBase::getId() const { return impl->getId(); } +framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h new file mode 100644 index 0000000000..890dbd269b --- /dev/null +++ b/cpp/src/qpid/client/SessionBase.h @@ -0,0 +1,101 @@ +#ifndef QPID_CLIENT_SESSIONBASE_H +#define QPID_CLIENT_SESSIONBASE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Uuid.h" +#include "qpid/framing/amqp_structs.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/framing/TransferContent.h" +#include "qpid/client/Completion.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Response.h" +#include "qpid/client/SessionCore.h" +#include "qpid/client/TypedResult.h" +#include "qpid/shared_ptr.h" +#include <string> + +namespace qpid { +namespace client { + +using std::string; +using framing::Content; +using framing::FieldTable; +using framing::MethodContent; +using framing::SequenceNumberSet; +using framing::Uuid; + +/** + * Basic session operations that are not derived from AMQP XML methods. + */ +class SessionBase +{ + public: + SessionBase(); + ~SessionBase(); + + /** Get the next message frame-set from the session. */ + framing::FrameSet::shared_ptr get(); + + /** Get the session ID */ + Uuid getId() const; + + /** + * In synchronous mode, the session sets the sync bit on every + * command and waits for the broker's response before returning. + * Note this gives lower throughput than non-synchronous mode. + * + * In non-synchronous mode commands are sent without waiting + * for a respose (you can use the returned Completion object + * to wait for completion.) + * + *@param if true set the session to synchronous mode, else + * set it to non-synchronous mode. + */ + void setSynchronous(bool isSync); + + bool isSynchronous() const; + + /** + * Suspend the session, can be resumed on a different connection. + * @see Connection::resume() + */ + void suspend(); + + /** Close the session */ + void close(); + + Execution& getExecution(); + + typedef framing::TransferContent DefaultContent; + + protected: + shared_ptr<SessionCore> impl; + framing::ProtocolVersion version; + friend class Connection; + SessionBase(shared_ptr<SessionCore>); +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SESSIONBASE_H*/ diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index ee9e9570ed..07a791bef3 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -203,8 +203,10 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) { // user thread { Lock l(state); - if (state==OPEN) - doSuspend(REPLY_SUCCESS, OK); + if (state==SUSPENDED) { // Clear error that caused suspend + code=REPLY_SUCCESS; + text=OK; + } check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION); SequenceNumber sendAck=session->resuming(); attaching(c); diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index a758dc1341..ec2f7000ef 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -38,29 +38,30 @@ SubscriptionManager::SubscriptionManager(Session_0_10& s) confirmMode(true), acquireMode(false) {} -void SubscriptionManager::subscribeInternal( +Completion SubscriptionManager::subscribeInternal( const std::string& q, const std::string& dest) { - session.messageSubscribe(arg::queue=q, arg::destination=dest, + Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::confirmMode=confirmMode, arg::acquireMode=acquireMode); setFlowControl(dest, messages, bytes, window); + return c; } -void SubscriptionManager::subscribe( +Completion SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& d) { std::string dest=d.empty() ? q:d; dispatcher.listen(dest, &listener, autoAck); - subscribeInternal(q, dest); + return subscribeInternal(q, dest); } -void SubscriptionManager::subscribe( +Completion SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& d) { std::string dest=d.empty() ? q:d; lq.session=session; - lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest)); - subscribeInternal(q, dest); + lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); + return subscribeInternal(q, dest); } void SubscriptionManager::setFlowControl( @@ -91,7 +92,9 @@ void SubscriptionManager::cancel(const std::string dest) session.messageCancel(dest); } -void SubscriptionManager::run(bool autoStop) +void SubscriptionManager::setAutoStop(bool set) { autoStop=set; } + +void SubscriptionManager::run() { dispatcher.setAutoStop(autoStop); dispatcher.run(); diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index c9066fe1de..1205478bf8 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -23,21 +23,24 @@ */ #include "qpid/sys/Mutex.h" #include <qpid/client/Dispatcher.h> +#include <qpid/client/Completion.h> #include <qpid/client/Session_0_10.h> #include <qpid/client/MessageListener.h> #include <qpid/client/LocalQueue.h> +#include <qpid/sys/Runnable.h> + #include <set> #include <sstream> namespace qpid { namespace client { -class SubscriptionManager +class SubscriptionManager : public sys::Runnable { typedef sys::Mutex::ScopedLock Lock; typedef sys::Mutex::ScopedUnlock Unlock; - void subscribeInternal(const std::string& q, const std::string& dest); + Completion subscribeInternal(const std::string& q, const std::string& dest); qpid::client::Dispatcher dispatcher; qpid::client::Session_0_10& session; @@ -47,8 +50,9 @@ class SubscriptionManager AckPolicy autoAck; bool confirmMode; bool acquireMode; - -public: + bool autoStop; + + public: SubscriptionManager(Session_0_10& session); /** @@ -59,9 +63,9 @@ public: *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ - void subscribe(MessageListener& listener, - const std::string& queue, - const std::string& tag=std::string()); + Completion subscribe(MessageListener& listener, + const std::string& queue, + const std::string& tag=std::string()); /** * Subscribe a LocalQueue to receive messages from queue. @@ -70,17 +74,21 @@ public: *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ - void subscribe(LocalQueue& localQueue, + Completion subscribe(LocalQueue& localQueue, const std::string& queue, const std::string& tag=std::string()); /** Cancel a subscription. */ void cancel(const std::string tag); - /** Deliver messages until stop() is called. - *@param autoStop If true, return when all listeners are cancelled. + /** Deliver messages until stop() is called. */ + void run(); + + /** If set true, run() will stop when all subscriptions + * are cancelled. If false, run will only stop when stop() + * is called. True by default. */ - void run(bool autoStop=true); + void setAutoStop(bool set=true); /** Cause run() to return */ void stop(); diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h new file mode 100644 index 0000000000..ceb8ecb4e6 --- /dev/null +++ b/cpp/src/tests/BrokerFixture.h @@ -0,0 +1,101 @@ +#ifndef TESTS_BROKERFIXTURE_H +#define TESTS_BROKERFIXTURE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Thread.h" +#include "qpid/broker/Broker.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Session_0_10.h" +#include "qpid/client/SubscriptionManager.h" + +namespace qpid { namespace client { +/** Back door into private Connector stuff */ +struct TestConnector { + static void disconnect(qpid::client::Connector& c) { + c.socket.close(); + c.handleClosed(); + } +}; +}} + +/** + * A fixture to create an in-process broker and connect to it for tests. + */ +struct BrokerFixture { + typedef qpid::broker::Broker Broker; + typedef boost::shared_ptr<Broker> BrokerPtr; + + struct OpenConnection : public qpid::client::Connection { + OpenConnection(int port) { open("localhost", port); } + }; + + BrokerPtr broker; + qpid::sys::Thread brokerThread; + OpenConnection connection; + qpid::client::Session_0_10 session; + qpid::client::SubscriptionManager subs; + qpid::client::LocalQueue lq; + + BrokerPtr newBroker() { + Broker::Options opts; + opts.port=0; + opts.workerThreads=1; + BrokerPtr b=Broker::create(opts); + // TODO aconway 2007-12-05: Without the following line + // the test can hang in the connection ctor. This is + // a race condition that should be fixed. + b->getPort(); + return b; + }; + + BrokerFixture() : broker(newBroker()), + brokerThread(*broker), + connection(broker->getPort()), + session(connection.newSession()), + subs(session) + {} + + ~BrokerFixture() { + connection.close(); + broker->shutdown(); + brokerThread.join(); + } + + /** Open a connection to the local broker */ + void open(qpid::client::Connection& c) { + c.open("localhost", broker->getPort()); + } + + /** Close a connection's socket */ + static void disconnect(qpid::client::Connection& c) { + struct Expose : public qpid::client::Connection { + void disconnect() { + qpid::client::TestConnector::disconnect(*impl->getConnector()); + } + }; + static_cast<Expose&>(c).disconnect(); + } +}; + +#endif /*!TESTS_BROKERFIXTURE_H*/ diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp index 454b9ca56d..bd8f5af6be 100644 --- a/cpp/src/tests/ClientChannelTest.cpp +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -20,7 +20,7 @@ */ #include <vector> #include "qpid_test_plugin.h" -#include "InProcessBroker.h" +#include "BrokerFixture.h" #include "qpid/client/Channel.h" #include "qpid/client/Message.h" #include "qpid/client/Queue.h" @@ -44,7 +44,7 @@ const size_t FRAME_MAX = 256; * The test base defines the tests methods, derived classes * instantiate the channel in Basic or Message mode. */ -class ChannelTestBase : public CppUnit::TestCase +class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture { struct Listener: public qpid::client::MessageListener { vector<Message> messages; @@ -56,7 +56,6 @@ class ChannelTestBase : public CppUnit::TestCase } }; - qpid::InProcessBrokerClient connection; const std::string qname; const std::string data; Queue queue; @@ -69,8 +68,7 @@ class ChannelTestBase : public CppUnit::TestCase public: ChannelTestBase() - : connection(FRAME_MAX), - qname("testq"), data("hello"), + : qname("testq"), data("hello"), queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) {} diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index ac2cf155f4..70e8a41074 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -19,7 +19,7 @@ * */ #include "qpid_test_plugin.h" -#include "InProcessBroker.h" +#include "BrokerFixture.h" #include "qpid/client/Dispatcher.h" #include "qpid/client/Session_0_10.h" #include "qpid/framing/TransferContent.h" @@ -61,7 +61,7 @@ struct DummyListener : public MessageListener } }; -class ClientSessionTest : public CppUnit::TestCase +class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture { CPPUNIT_TEST_SUITE(ClientSessionTest); CPPUNIT_TEST(testQueueQuery); @@ -74,23 +74,14 @@ class ClientSessionTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE_END(); shared_ptr<broker::Broker> broker; - Session_0_10 session; - // Defer construction & thread creation to setUp - boost::optional<InProcessConnection> c; - boost::optional<InProcessConnection> c2; -public: + public: void setUp() { broker = broker::Broker::create(); - c=boost::in_place<InProcessConnection>(broker); - c2=boost::in_place<InProcessConnection>(broker); } void tearDown() { - c2.reset(); - c.reset(); - broker.reset(); } void declareSubscribe(const std::string& q="my-queue", @@ -109,7 +100,7 @@ public: void testQueueQuery() { - session = c->newSession(); + session =connection.newSession(); session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); TypedResult<QueueQueryResult> result = session.queueQuery(std::string("my-queue")); CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable()); @@ -120,7 +111,7 @@ public: void testTransfer() { - session = c->newSession(); + session =connection.newSession(); declareSubscribe(); session.messageTransfer(content=TransferContent("my-message", "my-queue")); //get & test the message: @@ -128,12 +119,12 @@ public: CPPUNIT_ASSERT(msg->isA<MessageTransferBody>()); CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent()); //confirm receipt: - session.execution().completed(msg->getId(), true, true); + session.getExecution().completed(msg->getId(), true, true); } void testDispatcher() { - session = c->newSession(); + session =connection.newSession(); declareSubscribe(); TransferContent msg1("One"); @@ -161,16 +152,16 @@ public: } void testResumeExpiredError() { - session = c->newSession(0); + session =connection.newSession(0); session.suspend(); // session has 0 timeout. try { - c->resume(session); + connection.resume(session); CPPUNIT_FAIL("Expected InvalidArgumentException."); } catch(const InternalErrorException&) {} } void testUseSuspendedError() { - session = c->newSession(60); + session =connection.newSession(60); session.suspend(); try { session.exchangeQuery(name="amq.fanout"); @@ -179,26 +170,27 @@ public: } void testSuspendResume() { - session = c->newSession(60); + session =connection.newSession(60); declareSubscribe(); session.suspend(); // Make sure we are still subscribed after resume. - c->resume(session); + connection.resume(session); session.messageTransfer(content=TransferContent("my-message", "my-queue")); FrameSet::shared_ptr msg = session.get(); CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent()); } void testDisconnectResume() { - session = c->newSession(60); + session =connection.newSession(60); session.queueDeclare(queue="before"); CPPUNIT_ASSERT(queueExists("before")); - // Simulate lost frames. - c->discard(); session.queueDeclare(queue=string("after")); - c->disconnect(); // Simulate disconnect, resume on a new connection. - c2->resume(session); + disconnect(connection); + Connection c2; + open(c2); + c2.resume(session); CPPUNIT_ASSERT(queueExists("after")); + c2.close(); } }; diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index d7094c9bfd..e774c48070 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -20,7 +20,7 @@ */ #include "unit_test.h" -#include "InProcessBroker.h" +#include "BrokerFixture.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" @@ -31,74 +31,74 @@ QPID_AUTO_TEST_SUITE(exception_test) using namespace std; using namespace qpid; +using namespace sys; using namespace client; using namespace framing; -struct Fixture { - InProcessConnection connection; - InProcessConnection connection2; - Session_0_10 session; - SubscriptionManager sub; - LocalQueue q; +using boost::bind; +using boost::function; - Fixture() : connection(), - connection2(connection.getBroker()), - session(connection.newSession()), - sub(session) - { - session.queueDeclare(arg::queue="q"); +template <class Ex> +struct Catcher : public Runnable { + function<void ()> f; + bool caught; + Thread thread; + + Catcher(function<void ()> f_) : f(f_), caught(false), thread(this) {} + ~Catcher() { join(); } + + void run() { + try { f(); } + catch(const Ex& e) { + caught=true; + BOOST_MESSAGE(e.what()); + } + catch(const std::exception& e) { + BOOST_ERROR(string("Bad exception: ")+e.what()); + } + catch(...) { + BOOST_ERROR(string("Bad exception: unknown")); + } + } + + bool join() { + if (thread.id()) { + thread.join(); + thread=Thread(); + } + return caught; } }; +BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) { + Catcher<ClosedException> get(bind(&Session_0_10::get, session)); + disconnect(connection); + BOOST_CHECK(get.join()); +} -// TODO aconway 2007-11-30: need InProcessBroker to be a more accurate -// simulation of shutdown behaviour. It should override only -// Connector.run() to substitute NetworkQueues for the Dispatcher. -// -// template <class Ex> -// struct Catcher : public sys::Runnable { -// Session_0_10 s; -// boost::function<void ()> f; -// bool caught; -// Catcher(Session_0_10 s_, boost::function<void ()> f_) -// : s(s_), f(f_), caught(false) {} -// void run() { -// try { f(); } catch(const Ex& e) { -// caught=true; -// BOOST_MESSAGE(e.what()); -// } -// } -// }; +BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) { + session.queueDeclare(arg::queue="q"); + subs.subscribe(lq, "q"); + Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq))); + disconnect(connection); + BOOST_CHECK(pop.join()); +} -// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, Fixture) { -// Catcher<Exception> get(session, boost::bind(&Session_0_10::get, session)); -// sub.subscribe(q, "q"); -// sys::Thread t(get); -// connection.disconnect(); -// t.join(); -// BOOST_CHECK(get.caught); -// } - -// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, Fixture) { +// FIXME aconway 2007-12-07: This test hangs sporadically at t.join +// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) { // struct NullListener : public MessageListener { // void received(Message&) { BOOST_FAIL("Unexpected message"); } // } l; -// sub.subscribe(l, "q"); -// connection.disconnect(); -// try { -// sub.run(); -// BOOST_FAIL("Expected exception"); -// } catch (const Exception&e) { BOOST_FAIL(e.what()); } -// try { -// session.queueDeclare(arg::queue="foo"); -// BOOST_FAIL("Expected exception"); -// } catch (const Exception&e) { BOOST_FAIL(e.what()); } +// session.queueDeclare(arg::queue="q"); +// subs.subscribe(l, "q"); +// Thread t(subs); +// disconnect(connection); +// t.join(); +// BOOST_CHECK_THROW(session.close(), InternalErrorException); // } -// TODO aconway 2007-11-30: setSynchronous appears not to work. -// BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, Fixture) { -// session.setSynchronous(true); -// BOOST_CHECK_THROW(sub.subscribe(q, "no such queue"), NotFoundException); -// } +BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) { + BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException); +} QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index bd2c0d9933..d6185a0100 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -205,7 +205,7 @@ struct Setup : public Client { } } // Make sure this is all completed before we return. - session.execution().sendSyncRequest(); + session.getExecution().sendSyncRequest(); } }; @@ -231,13 +231,9 @@ class Stats { // Functor to collect rates. void operator()(const string& data) { - try { - double d=lexical_cast<double>(data); - values.push_back(d); - sum += d; - } catch (...) { - throw Exception(QPID_MSG("Bad data, expecting double: " << data)); - } + double d=lexical_cast<double>(data); + values.push_back(d); + sum += d; } double mean() const { |