diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/CompletionTracker.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/client/CompletionTracker.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/Message.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.h | 101 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 30 |
16 files changed, 196 insertions, 34 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index 3551946d1e..a6875fcb63 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -174,7 +174,7 @@ void Channel::cancel(const std::string& tag, bool synch) { bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { string tag = "get-handler"; - ScopedDivert handler(tag, session.execution().getDemux()); + ScopedDivert handler(tag, session.getExecution().getDemux()); Demux::QueuePtr incoming = handler.getQueue(); session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); @@ -243,7 +243,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination) bool send = i->second.ackMode == AUTO_ACK || (prefetch && ++(i->second.count) > (prefetch / 2)); if (send) i->second.count = 0; - session.execution().completed(content.getId(), true, send); + session.getExecution().completed(content.getId(), true, send); } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp index 46a7384ac2..4c6e59f1b8 100644 --- a/cpp/src/qpid/client/CompletionTracker.cpp +++ b/cpp/src/qpid/client/CompletionTracker.cpp @@ -31,12 +31,13 @@ namespace const std::string empty; } -CompletionTracker::CompletionTracker() {} +CompletionTracker::CompletionTracker() : closed(false) {} CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} void CompletionTracker::close() { sys::Mutex::ScopedLock l(lock); + closed=true; while (!listeners.empty()) { Record r(listeners.front()); { @@ -47,17 +48,18 @@ void CompletionTracker::close() } } + void CompletionTracker::completed(const SequenceNumber& _mark) { sys::Mutex::ScopedLock l(lock); mark = _mark; while (!listeners.empty() && !(listeners.front().id > mark)) { Record r(listeners.front()); + listeners.pop_front(); { sys::Mutex::ScopedUnlock u(lock); r.completed(); } - listeners.pop_front(); } } @@ -88,14 +90,13 @@ void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListe bool CompletionTracker::add(const Record& record) { sys::Mutex::ScopedLock l(lock); - if (record.id < mark) { + if (record.id < mark || closed) { return false; } else { //insert at the correct position Listeners::iterator i = seek(record.id); if (i == listeners.end()) i = listeners.begin(); listeners.insert(i, record); - return true; } } diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h index b2697f399f..55f7ff7531 100644 --- a/cpp/src/qpid/client/CompletionTracker.h +++ b/cpp/src/qpid/client/CompletionTracker.h @@ -60,7 +60,8 @@ private: }; typedef std::list<Record> Listeners; - + bool closed; + sys::Mutex lock; framing::SequenceNumber mark; Listeners listeners; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 1ae8a25d6b..cf00b2b296 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -44,6 +44,8 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) connector->setShutdownHandler(this); } +ConnectionImpl::~ConnectionImpl() { close(); } + void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 46bd5b685d..6cc20a15f6 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -62,6 +62,8 @@ class ConnectionImpl : public framing::FrameHandler, typedef boost::shared_ptr<ConnectionImpl> shared_ptr; ConnectionImpl(boost::shared_ptr<Connector> c); + ~ConnectionImpl(); + void addSession(const boost::shared_ptr<SessionCore>&); void open(const std::string& host, int port = 5672, diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index f32d0470ec..23d2c3ff8d 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -51,7 +51,8 @@ Connector::Connector( { } -Connector::~Connector(){ +Connector::~Connector() { + close(); if (receiver.id() && receiver.id() != Thread::current().id()) receiver.join(); } @@ -76,7 +77,6 @@ void Connector::init(){ receiver = Thread(this); } -// Call with closedLock held bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); if (!closed) { diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index af6badd6e0..946289efdc 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -88,6 +88,8 @@ class Connector : public framing::OutputHandler, void eof(qpid::sys::AsynchIO&); friend class Channel; + friend class TestConnector; + public: Connector(framing::ProtocolVersion pVersion, bool debug = false, uint32_t buffer_size = 1024); diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 7ffcd676a3..6b6a76b222 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -52,8 +52,8 @@ Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), running(false) { queue = q.empty() ? - session.execution().getDemux().getDefault() : - session.execution().getDemux().get(q); + session.getExecution().getDemux().getDefault() : + session.getExecution().getDemux().get(q); } void Dispatcher::start() diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 7b8fb8d01f..f30a35aad0 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -167,6 +167,8 @@ void ExecutionHandler::sendCompletion() out(frame); } +SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; } + SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener) { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 0999540909..6286ccd591 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -80,6 +80,7 @@ public: framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener()); framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, ResultListener=ResultListener()); + framing::SequenceNumber lastSent() const; void sendSyncRequest(); void sendFlushRequest(); void completed(const framing::SequenceNumber& id, bool cumulative, bool send); diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index 081384974a..86404ac792 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -65,12 +65,12 @@ public: void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const { - session.execution().completed(id, cumulative, send); + session.getExecution().completed(id, cumulative, send); } void acknowledge(bool cumulative = true, bool send = true) const { - const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send); + const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send); } /**@internal for incoming messages */ diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp new file mode 100644 index 0000000000..06266ded91 --- /dev/null +++ b/cpp/src/qpid/client/SessionBase.cpp @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionBase.h" + +namespace qpid { +namespace client { +using namespace framing; + +SessionBase::SessionBase() {} +SessionBase::~SessionBase() {} +SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {} +void SessionBase::suspend() { impl->suspend(); } +void SessionBase::close() { impl->close(); } +void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); } +bool SessionBase::isSynchronous() const { return impl->isSync(); } +Execution& SessionBase::getExecution() { return impl->getExecution(); } +Uuid SessionBase::getId() const { return impl->getId(); } +framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h new file mode 100644 index 0000000000..890dbd269b --- /dev/null +++ b/cpp/src/qpid/client/SessionBase.h @@ -0,0 +1,101 @@ +#ifndef QPID_CLIENT_SESSIONBASE_H +#define QPID_CLIENT_SESSIONBASE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Uuid.h" +#include "qpid/framing/amqp_structs.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/framing/TransferContent.h" +#include "qpid/client/Completion.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Response.h" +#include "qpid/client/SessionCore.h" +#include "qpid/client/TypedResult.h" +#include "qpid/shared_ptr.h" +#include <string> + +namespace qpid { +namespace client { + +using std::string; +using framing::Content; +using framing::FieldTable; +using framing::MethodContent; +using framing::SequenceNumberSet; +using framing::Uuid; + +/** + * Basic session operations that are not derived from AMQP XML methods. + */ +class SessionBase +{ + public: + SessionBase(); + ~SessionBase(); + + /** Get the next message frame-set from the session. */ + framing::FrameSet::shared_ptr get(); + + /** Get the session ID */ + Uuid getId() const; + + /** + * In synchronous mode, the session sets the sync bit on every + * command and waits for the broker's response before returning. + * Note this gives lower throughput than non-synchronous mode. + * + * In non-synchronous mode commands are sent without waiting + * for a respose (you can use the returned Completion object + * to wait for completion.) + * + *@param if true set the session to synchronous mode, else + * set it to non-synchronous mode. + */ + void setSynchronous(bool isSync); + + bool isSynchronous() const; + + /** + * Suspend the session, can be resumed on a different connection. + * @see Connection::resume() + */ + void suspend(); + + /** Close the session */ + void close(); + + Execution& getExecution(); + + typedef framing::TransferContent DefaultContent; + + protected: + shared_ptr<SessionCore> impl; + framing::ProtocolVersion version; + friend class Connection; + SessionBase(shared_ptr<SessionCore>); +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SESSIONBASE_H*/ diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index ee9e9570ed..07a791bef3 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -203,8 +203,10 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) { // user thread { Lock l(state); - if (state==OPEN) - doSuspend(REPLY_SUCCESS, OK); + if (state==SUSPENDED) { // Clear error that caused suspend + code=REPLY_SUCCESS; + text=OK; + } check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION); SequenceNumber sendAck=session->resuming(); attaching(c); diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index a758dc1341..ec2f7000ef 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -38,29 +38,30 @@ SubscriptionManager::SubscriptionManager(Session_0_10& s) confirmMode(true), acquireMode(false) {} -void SubscriptionManager::subscribeInternal( +Completion SubscriptionManager::subscribeInternal( const std::string& q, const std::string& dest) { - session.messageSubscribe(arg::queue=q, arg::destination=dest, + Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::confirmMode=confirmMode, arg::acquireMode=acquireMode); setFlowControl(dest, messages, bytes, window); + return c; } -void SubscriptionManager::subscribe( +Completion SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& d) { std::string dest=d.empty() ? q:d; dispatcher.listen(dest, &listener, autoAck); - subscribeInternal(q, dest); + return subscribeInternal(q, dest); } -void SubscriptionManager::subscribe( +Completion SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& d) { std::string dest=d.empty() ? q:d; lq.session=session; - lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest)); - subscribeInternal(q, dest); + lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); + return subscribeInternal(q, dest); } void SubscriptionManager::setFlowControl( @@ -91,7 +92,9 @@ void SubscriptionManager::cancel(const std::string dest) session.messageCancel(dest); } -void SubscriptionManager::run(bool autoStop) +void SubscriptionManager::setAutoStop(bool set) { autoStop=set; } + +void SubscriptionManager::run() { dispatcher.setAutoStop(autoStop); dispatcher.run(); diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index c9066fe1de..1205478bf8 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -23,21 +23,24 @@ */ #include "qpid/sys/Mutex.h" #include <qpid/client/Dispatcher.h> +#include <qpid/client/Completion.h> #include <qpid/client/Session_0_10.h> #include <qpid/client/MessageListener.h> #include <qpid/client/LocalQueue.h> +#include <qpid/sys/Runnable.h> + #include <set> #include <sstream> namespace qpid { namespace client { -class SubscriptionManager +class SubscriptionManager : public sys::Runnable { typedef sys::Mutex::ScopedLock Lock; typedef sys::Mutex::ScopedUnlock Unlock; - void subscribeInternal(const std::string& q, const std::string& dest); + Completion subscribeInternal(const std::string& q, const std::string& dest); qpid::client::Dispatcher dispatcher; qpid::client::Session_0_10& session; @@ -47,8 +50,9 @@ class SubscriptionManager AckPolicy autoAck; bool confirmMode; bool acquireMode; - -public: + bool autoStop; + + public: SubscriptionManager(Session_0_10& session); /** @@ -59,9 +63,9 @@ public: *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ - void subscribe(MessageListener& listener, - const std::string& queue, - const std::string& tag=std::string()); + Completion subscribe(MessageListener& listener, + const std::string& queue, + const std::string& tag=std::string()); /** * Subscribe a LocalQueue to receive messages from queue. @@ -70,17 +74,21 @@ public: *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ - void subscribe(LocalQueue& localQueue, + Completion subscribe(LocalQueue& localQueue, const std::string& queue, const std::string& tag=std::string()); /** Cancel a subscription. */ void cancel(const std::string tag); - /** Deliver messages until stop() is called. - *@param autoStop If true, return when all listeners are cancelled. + /** Deliver messages until stop() is called. */ + void run(); + + /** If set true, run() will stop when all subscriptions + * are cancelled. If false, run will only stop when stop() + * is called. True by default. */ - void run(bool autoStop=true); + void setAutoStop(bool set=true); /** Cause run() to return */ void stop(); |