diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/SessionId.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/client/AckPolicy.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/AsyncSession.h | 38 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/Session.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.cpp | 84 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.h | 148 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10.cpp | 64 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10.h | 104 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10Access.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 10 |
17 files changed, 317 insertions, 316 deletions
diff --git a/cpp/src/qpid/SessionId.h b/cpp/src/qpid/SessionId.h index 08553e8b1d..291c42a2bb 100644 --- a/cpp/src/qpid/SessionId.h +++ b/cpp/src/qpid/SessionId.h @@ -27,7 +27,17 @@ namespace qpid { -/** Identifier for a session */ +/** Identifier for a session. + * There are two parts to a session identifier: + * + * getUserId() returns the authentication principal associated with + * the session's connection. + * + * getName() returns the session name. + * + * The name must be unique among sessions with the same authentication + * principal. + */ class SessionId : boost::totally_ordered1<SessionId> { std::string userId; std::string name; diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h index 8d62b6f4f2..d00df1ef26 100644 --- a/cpp/src/qpid/client/AckPolicy.h +++ b/cpp/src/qpid/client/AckPolicy.h @@ -22,6 +22,7 @@ */ #include "qpid/framing/SequenceSet.h" +#include "qpid/client/AsyncSession.h" namespace qpid { namespace client { @@ -44,7 +45,7 @@ class AckPolicy */ AckPolicy(size_t n=1) : interval(n), count(n) {} - void ack(const Message& msg, Session& session) { + void ack(const Message& msg, AsyncSession session) { accepted.add(msg.getId()); if (!interval) return; if (--count==0) { @@ -57,7 +58,7 @@ class AckPolicy } } - void ackOutstanding(Session& session) { + void ackOutstanding(AsyncSession session) { if (!accepted.empty()) { session.messageAccept(accepted); accepted.clear(); diff --git a/cpp/src/qpid/client/AsyncSession.h b/cpp/src/qpid/client/AsyncSession.h new file mode 100644 index 0000000000..150aabe191 --- /dev/null +++ b/cpp/src/qpid/client/AsyncSession.h @@ -0,0 +1,38 @@ +#ifndef QPID_CLIENT_ASYNCSESSION_H +#define QPID_CLIENT_ASYNCSESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/AsyncSession_0_10.h" + +namespace qpid { +namespace client { + +/** + * AsyncSession is an alias for Session_0_10 + * + * \ingroup clientapi + */ +typedef AsyncSession_0_10 AsyncSession; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_ASYNCSESSION_H*/ diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 82d1eac8b4..bec2b0345d 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -22,6 +22,7 @@ #include "ConnectionSettings.h" #include "Message.h" #include "SessionImpl.h" +#include "SessionBase_0_10Access.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" @@ -72,18 +73,16 @@ void Connection::open(const ConnectionSettings& settings) max_frame_size = impl->getNegotiatedSettings().maxFrameSize; } -Session Connection::newSession(SynchronousMode sync, - uint32_t detachedLifetime) -{ +Session Connection::newSession(const std::string& name) { if (!impl) throw Exception(QPID_MSG("Connection has not yet been opened")); - - shared_ptr<SessionImpl> core( - new SessionImpl(impl, ++channelIdCounter, max_frame_size)); - core->setSync(sync); - impl->addSession(core); - core->open(detachedLifetime); - return Session(core); + shared_ptr<SessionImpl> simpl( + new SessionImpl(name, impl, ++channelIdCounter, max_frame_size)); + impl->addSession(simpl); + simpl->open(0); + Session s; + SessionBase_0_10Access(s).set(simpl); + return s; } void Connection::resume(Session& session) { diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 0c01c77509..5337a20bfa 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -26,7 +26,6 @@ #include "ConnectionImpl.h" #include "qpid/client/Session.h" #include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/Uuid.h" namespace qpid { @@ -111,14 +110,11 @@ class Connection * multiple streams of work to be multiplexed over the same * connection. * - *@param detachedLifetime: A session may be detached from its - * channel, either by calling Session::suspend() or because of a - * network failure. The session state is preserved for - * detachedLifetime seconds to allow a call to resume(). After - * that the broker may discard the session state. Default is 0, - * meaning the session cannot be resumed. + *@param name: A name to identify the session. @see qpid::SessionId + * If the name is empty (the default) then a unique name will be + * chosen using a Universally-unique identifier (UUID) algorithm. */ - Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0); + Session newSession(const std::string& name=std::string()); /** * Resume a suspended session. A session may be resumed diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 2bbe5a122f..0bd0cb9d08 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -37,7 +37,8 @@ using qpid::sys::Thread; namespace qpid { namespace client { -Subscriber::Subscriber(Session& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {} +Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a) + : session(s), listener(l), autoAck(a) {} void Subscriber::received(Message& msg) { @@ -47,7 +48,7 @@ void Subscriber::received(Message& msg) } } -Dispatcher::Dispatcher(Session& s, const std::string& q) +Dispatcher::Dispatcher(const Session& s, const std::string& q) : session(s), running(false), autoStop(true) { queue = q.empty() ? @@ -88,7 +89,7 @@ void Dispatcher::run() } } } - session.sync(); // Make sure all our acks are received before returning. + sync(session).sync(); // Make sure all our acks are received before returning. } catch (const ClosedException&) {} //ignore it and return catch (const std::exception& e) { diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h index e23d0c198c..1b31ddf4cf 100644 --- a/cpp/src/qpid/client/Dispatcher.h +++ b/cpp/src/qpid/client/Dispatcher.h @@ -37,13 +37,13 @@ namespace client { class Subscriber : public MessageListener { - Session& session; + AsyncSession session; MessageListener* const listener; AckPolicy autoAck; public: typedef boost::shared_ptr<Subscriber> shared_ptr; - Subscriber(Session& session, MessageListener* listener, AckPolicy); + Subscriber(const Session& session, MessageListener* listener, AckPolicy); void received(Message& msg); }; @@ -55,7 +55,7 @@ class Dispatcher : public sys::Runnable typedef std::map<std::string, Subscriber::shared_ptr> Listeners; sys::Mutex lock; sys::Thread worker; - Session& session; + Session session; Demux::QueuePtr queue; bool running; bool autoStop; @@ -67,7 +67,7 @@ class Dispatcher : public sys::Runnable bool isStopped(); public: - Dispatcher(Session& session, const std::string& queue = ""); + Dispatcher(const Session& session, const std::string& queue = ""); void start(); void run(); diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h index 84831ec442..bdabd26c82 100644 --- a/cpp/src/qpid/client/Session.h +++ b/cpp/src/qpid/client/Session.h @@ -33,6 +33,7 @@ namespace client { */ typedef Session_0_10 Session; + }} // namespace qpid::client #endif /*!QPID_CLIENT_SESSION_H*/ diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp deleted file mode 100644 index dfd0f62e7e..0000000000 --- a/cpp/src/qpid/client/SessionBase.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "SessionBase.h" -#include "qpid/framing/all_method_bodies.h" - -namespace qpid { -namespace client { -using namespace framing; - -SessionBase::SessionBase() {} -SessionBase::~SessionBase() {} -SessionBase::SessionBase(shared_ptr<SessionImpl> core) : impl(core) {} -void SessionBase::suspend() { impl->suspend(); } -void SessionBase::close() { impl->close(); } - -void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); } -void SessionBase::setSynchronous(SynchronousMode m) { impl->setSync(m); } -bool SessionBase::isSynchronous() const { return impl->isSync(); } -SynchronousMode SessionBase::getSynchronous() const { - return SynchronousMode(impl->isSync()); -} - -Execution& SessionBase::getExecution() -{ - return *impl; -} - -void SessionBase::flush() -{ - impl->sendFlush(); -} - -// FIXME aconway 2008-04-24: do we need to provide a non-synchronous version -// of sync() or bool paramter to allow setting a sync point for a later wait? -void SessionBase::sync() -{ - ExecutionSyncBody b; - b.setSync(true); - impl->send(b).wait(*impl); -} - -void SessionBase::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) -{ - impl->markCompleted(id, cumulative, notifyPeer); -} - -void SessionBase::sendCompletion() -{ - impl->sendCompletion(); -} - -Uuid SessionBase::getId() const { return impl->getId(); } -framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } - -SessionBase::ScopedSync::ScopedSync(SessionBase& s) : session(s), change(!s.isSynchronous()) -{ - if (change) session.setSynchronous(true); -} - -SessionBase::ScopedSync::~ScopedSync() -{ - if (change) session.setSynchronous(false); -} - - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h deleted file mode 100644 index 7f4a27dc09..0000000000 --- a/cpp/src/qpid/client/SessionBase.h +++ /dev/null @@ -1,148 +0,0 @@ -#ifndef QPID_CLIENT_SESSIONBASE_H -#define QPID_CLIENT_SESSIONBASE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/Uuid.h" -#include "qpid/framing/amqp_structs.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/TransferContent.h" -#include "qpid/client/Completion.h" -#include "qpid/client/ConnectionImpl.h" -#include "qpid/client/Execution.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/TypedResult.h" -#include "qpid/shared_ptr.h" -#include <string> - -namespace qpid { -namespace client { - -using std::string; -using framing::Content; -using framing::FieldTable; -using framing::MethodContent; -using framing::SequenceNumberSet; -using framing::Uuid; - -enum CreditUnit { MESSAGE=0, BYTE=1 }; - -/** \defgroup clientapi Synchronous mode of a session. - * - * SYNC means that Session functions do not return until the remote - * broker has confirmed that the command was executed. - * - * ASYNC means that the client sends commands asynchronously, Session - * functions return immediately. - * - * ASYNC mode gives better performance for high-volume traffic, but - * requires some additional caution. - * - * Session functions return immediately. If the command causes an - * exception on the broker, the exception will be thrown on a - * <em>later</em> function call. - * - * If you need to notify some extenal agent that some actions have - * been taken (e.g. binding queues to exchanges), you must call - * Session::sync() first to ensure that all the commands are complete. - * - * You can freely switch between modes by calling Session::setSynchronous(). - * - * @see Session::sync(), Session::setSynchronous() - */ -enum SynchronousMode { SYNC=true, ASYNC=false }; - - -/** - * Basic session operations that are not derived from AMQP XML methods. - */ -class SessionBase -{ - public: - /** - * Instances of this class turn synchronous mode on for the - * duration of their scope (and revert back to async if required - * afterwards). - */ - class ScopedSync - { - SessionBase& session; - const bool change; - public: - ScopedSync(SessionBase& s); - ~ScopedSync(); - }; - - - SessionBase(); - ~SessionBase(); - - /** Get the next message frame-set from the session. */ - framing::FrameSet::shared_ptr get(); - - /** Get the session ID */ - Uuid getId() const; - - /** - * In synchronous mode, wait for the broker's response before - * returning. This gives lower throughput than asynchronous - * mode. - * - * In asynchronous mode commands are sent without waiting - * for a response (you can use the returned Completion object - * to wait for completion). - * - * @see SynchronousMode - */ - void setSynchronous(SynchronousMode mode); - void setSynchronous(bool set); - bool isSynchronous() const; - SynchronousMode getSynchronous() const; - - /** - * Suspend the session, which can be resumed on a different connection. - * @see Connection::resume() - */ - void suspend(); - - /** Close the session */ - void close(); - - Execution& getExecution(); - void sync(); - void flush(); - void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); - void sendCompletion(); - - typedef framing::TransferContent DefaultContent; - - protected: - shared_ptr<SessionImpl> impl; - framing::ProtocolVersion version; - friend class Connection; - SessionBase(shared_ptr<SessionImpl>); -}; - -}} // namespace qpid::client - -#endif /*!QPID_CLIENT_SESSIONBASE_H*/ diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp new file mode 100644 index 0000000000..974acbfcf6 --- /dev/null +++ b/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionBase_0_10.h" +#include "qpid/framing/all_method_bodies.h" + +namespace qpid { +namespace client { +using namespace framing; + +SessionBase_0_10::SessionBase_0_10() {} +SessionBase_0_10::~SessionBase_0_10() {} + +void SessionBase_0_10::close() { impl->close(); } + +Execution& SessionBase_0_10::getExecution() +{ + return *impl; +} + +void SessionBase_0_10::flush() +{ + impl->sendFlush(); +} + +void SessionBase_0_10::sync() +{ + ExecutionSyncBody b; + b.setSync(true); + impl->send(b).wait(*impl); +} + +void SessionBase_0_10::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) +{ + impl->markCompleted(id, cumulative, notifyPeer); +} + +void SessionBase_0_10::sendCompletion() +{ + impl->sendCompletion(); +} + +SessionId SessionBase_0_10::getId() const { return impl->getId(); } +framing::FrameSet::shared_ptr SessionBase_0_10::get() { return impl->get(); } + + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase_0_10.h b/cpp/src/qpid/client/SessionBase_0_10.h new file mode 100644 index 0000000000..f9ced049a9 --- /dev/null +++ b/cpp/src/qpid/client/SessionBase_0_10.h @@ -0,0 +1,104 @@ +#ifndef QPID_CLIENT_SESSIONBASE_H +#define QPID_CLIENT_SESSIONBASE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/SessionId.h" +#include "qpid/framing/amqp_structs.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/framing/TransferContent.h" +#include "qpid/client/Completion.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Execution.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/TypedResult.h" +#include "qpid/shared_ptr.h" +#include <string> + +namespace qpid { +namespace client { + +using std::string; +using framing::Content; +using framing::FieldTable; +using framing::MethodContent; +using framing::SequenceNumber; +using framing::SequenceSet; +using framing::SequenceNumberSet; +using qpid::SessionId; +using framing::Xid; + +enum CreditUnit { MESSAGE=0, BYTE=1 }; + +/** + * Base class for handles to an AMQP session. + * + * Subclasses provide the AMQP commands for a given + * version of the protocol. + */ +class SessionBase_0_10 { + public: + + typedef framing::TransferContent DefaultContent; + + SessionBase_0_10(); + ~SessionBase_0_10(); + + /** Get the next message frame-set from the session. */ + framing::FrameSet::shared_ptr get(); + + /** Get the session ID */ + SessionId getId() const; + + /** Close the session. + * A session is automatically closed when all handles to it are destroyed. + */ + void close(); + + /** Synchronize the session: sync() waits until all commands + * issued on this session have been completed. It is equivalent to + * calling Session::executionSync() + * + * Note sync() is always synchronous, even on an AsyncSession object + * because that's almost always what you want. You can call + * AsyncSession::executionSync() directly in the unusual event + * that you want to do an asynchronous sync. + */ + void sync(); + + /** Set the timeout for this session. */ + uint32_t timeout(uint32_t seconds); + + Execution& getExecution(); + void flush(); + void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); + void sendCompletion(); + + protected: + boost::shared_ptr<SessionImpl> impl; + friend class SessionBase_0_10Access; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SESSIONBASE_H*/ diff --git a/cpp/src/qpid/client/SessionBase_0_10Access.h b/cpp/src/qpid/client/SessionBase_0_10Access.h new file mode 100644 index 0000000000..e2189a53dd --- /dev/null +++ b/cpp/src/qpid/client/SessionBase_0_10Access.h @@ -0,0 +1,42 @@ +#ifndef QPID_CLIENT_SESSIONBASEACCESS_H +#define QPID_CLIENT_SESSIONBASEACCESS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/SessionBase_0_10.h" + +/**@file @internal Internal use only */ + +namespace qpid { +namespace client { + +class SessionBase_0_10Access { + public: + SessionBase_0_10Access(SessionBase_0_10& sb_) : sb(sb_) {} + void set(const boost::shared_ptr<SessionImpl>& si) { sb.impl = si; } + boost::shared_ptr<SessionImpl> get() { return sb.impl; } + private: + SessionBase_0_10& sb; +}; +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SESSIONBASEACCESS_H*/ diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 18b573b464..58f4bc0aa7 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -48,18 +48,16 @@ typedef sys::Monitor::ScopedUnlock UnLock; typedef sys::ScopedLock<sys::Semaphore> Acquire; -SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, +SessionImpl::SessionImpl(const std::string& name, + shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t _maxFrameSize) : error(OK), code(NORMAL), text(EMPTY), state(INACTIVE), - syncMode(false), detachedLifetime(0), maxFrameSize(_maxFrameSize), - id(true),//generate unique uuid for each session - name(id.str()), - //TODO: may want to allow application defined names instead + id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name), connection(conn), ioHandler(*this), channel(ch), @@ -81,15 +79,6 @@ SessionImpl::~SessionImpl() { connection->erase(channel); } -void SessionImpl::setSync(bool s) // user thread -{ - syncMode = s; //syncMode is volatile -} - -bool SessionImpl::isSync() // user thread -{ - return syncMode; //syncMode is volatile -} FrameSet::shared_ptr SessionImpl::get() // user thread { @@ -97,7 +86,7 @@ FrameSet::shared_ptr SessionImpl::get() // user thread return demux.getDefault()->pop(); } -const Uuid SessionImpl::getId() const //user thread +const SessionId SessionImpl::getId() const //user thread { return id; //id is immutable } @@ -107,7 +96,7 @@ void SessionImpl::open(uint32_t timeout) // user thread Lock l(state); if (state == INACTIVE) { setState(ATTACHING); - proxy.attach(name, false); + proxy.attach(id.getName(), false); waitFor(ATTACHED); //TODO: timeout will not be set locally until get response to //confirm, should we wait for that? @@ -144,7 +133,7 @@ void SessionImpl::suspend() //user thread void SessionImpl::detach() //call with lock held { if (state == ATTACHED) { - proxy.detach(name); + proxy.detach(id.getName()); setState(DETACHING); } } @@ -285,15 +274,11 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con { Acquire a(sendLock); SequenceNumber id = nextOut++; - bool sync; { Lock l(state); checkOpen(); incompleteOut.add(id); - sync = syncMode; } - - if (sync) command.getMethod()->setSync(true); Future f(id); if (command.getMethod()->resultExpected()) { Lock l(state); @@ -308,9 +293,6 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con if (content) { sendContent(*content); } - if (sync) { - waitForCompletion(id); - } return f; } void SessionImpl::sendContent(const MethodContent& content) @@ -441,27 +423,27 @@ void SessionImpl::attach(const std::string& /*name*/, bool /*force*/) void SessionImpl::attached(const std::string& _name) { Lock l(state); - if (name != _name) throw InternalErrorException("Incorrect session name"); + if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); setState(ATTACHED); } void SessionImpl::detach(const std::string& _name) { Lock l(state); - if (name != _name) throw InternalErrorException("Incorrect session name"); + if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); setState(DETACHED); - QPID_LOG(info, "Session detached by peer: " << name); + QPID_LOG(info, "Session detached by peer: " << id); } void SessionImpl::detached(const std::string& _name, uint8_t _code) { Lock l(state); - if (name != _name) throw InternalErrorException("Incorrect session name"); + if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); setState(DETACHED); if (_code) { //TODO: make sure this works with execution.exception - don't //want to overwrite the code from that - QPID_LOG(error, "Session detached by peer: " << name << " " << code); + QPID_LOG(error, "Session detached by peer: " << id << " " << code); error = SESSION_DETACH; code = _code; text = "Session detached by peer"; @@ -561,8 +543,6 @@ void SessionImpl::gap(const framing::SequenceSet& /*commands*/) throw NotImplementedException("gap not yet supported"); } - - void SessionImpl::sync() {} void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value) diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 0bcec4dd0c..7bb7136912 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -26,6 +26,7 @@ #include "Execution.h" #include "Results.h" +#include "qpid/SessionId.h" #include "qpid/shared_ptr.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ChannelHandler.h" @@ -59,14 +60,14 @@ class SessionImpl : public framing::FrameHandler::InOutHandler, private framing::AMQP_ClientOperations::ExecutionHandler { public: - SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); + SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); ~SessionImpl(); //NOTE: Public functions called in user thread. framing::FrameSet::shared_ptr get(); - const framing::Uuid getId() const; + const SessionId getId() const; uint16_t getChannel() const; void setChannel(uint16_t channel); @@ -76,8 +77,6 @@ public: void resume(shared_ptr<ConnectionImpl>); void suspend(); - void setSync(bool s); - bool isSync(); void assertOpen() const; Future send(const framing::AMQBody& command); @@ -131,7 +130,8 @@ private: Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); void sendContent(const framing::MethodContent&); void waitForCompletionImpl(const framing::SequenceNumber& id); - + void requestTimeout(uint32_t timeout); + void sendCompletionImpl(); // Note: Following methods are called by network thread in @@ -140,7 +140,6 @@ private: void attached(const std::string& name); void detach(const std::string& name); void detached(const std::string& name, uint8_t detachCode); - void requestTimeout(uint32_t timeout); void timeout(uint32_t timeout); void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset); void expected(const framing::SequenceSet& commands, const framing::Array& fragments); @@ -167,11 +166,9 @@ private: std::string text; // Error text mutable StateMonitor state; mutable sys::Semaphore sendLock; - volatile bool syncMode; uint32_t detachedLifetime; const uint64_t maxFrameSize; - const framing::Uuid id; - const std::string name; + const SessionId id; shared_ptr<ConnectionImpl> connection; framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler; diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 2ba3f5fe62..6036f153f6 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -32,23 +32,23 @@ namespace qpid { namespace client { -SubscriptionManager::SubscriptionManager(Session& s) +SubscriptionManager::SubscriptionManager(const Session& s) : dispatcher(s), session(s), messages(UNLIMITED), bytes(UNLIMITED), window(true), acceptMode(0), acquireMode(0), autoStop(true) {} -Completion SubscriptionManager::subscribeInternal( +void SubscriptionManager::subscribeInternal( const std::string& q, const std::string& dest) { - Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest, - arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); - setFlowControl(dest, messages, bytes, window); - return c; + async(session).messageSubscribe( // setFlowControl will sync. + arg::queue=q, arg::destination=dest, + arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); + setFlowControl(dest, messages, bytes, window); } -Completion SubscriptionManager::subscribe( +void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& d) { std::string dest=d.empty() ? q:d; @@ -56,7 +56,7 @@ Completion SubscriptionManager::subscribe( return subscribeInternal(q, dest); } -Completion SubscriptionManager::subscribe( +void SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& d) { std::string dest=d.empty() ? q:d; @@ -68,9 +68,9 @@ Completion SubscriptionManager::subscribe( void SubscriptionManager::setFlowControl( const std::string& dest, uint32_t messages, uint32_t bytes, bool window) { - session.messageSetFlowMode(dest, window); - session.messageFlow(dest, 0, messages); - session.messageFlow(dest, 1, bytes); + async(session).messageSetFlowMode(dest, window); + async(session).messageFlow(dest, 0, messages); + session.messageFlow(dest, 1, bytes); // Only need one sync } void SubscriptionManager::setFlowControl( diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 4ccb95c968..4ff962f67b 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -45,10 +45,10 @@ class SubscriptionManager : public sys::Runnable typedef sys::Mutex::ScopedLock Lock; typedef sys::Mutex::ScopedUnlock Unlock; - Completion subscribeInternal(const std::string& q, const std::string& dest); + void subscribeInternal(const std::string& q, const std::string& dest); qpid::client::Dispatcher dispatcher; - qpid::client::Session& session; + qpid::client::Session session; uint32_t messages; uint32_t bytes; bool window; @@ -58,7 +58,7 @@ class SubscriptionManager : public sys::Runnable bool autoStop; public: - SubscriptionManager(Session& session); + SubscriptionManager(const Session& session); /** * Subscribe a MessagesListener to receive messages from queue. @@ -68,7 +68,7 @@ class SubscriptionManager : public sys::Runnable *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ - Completion subscribe(MessageListener& listener, + void subscribe(MessageListener& listener, const std::string& queue, const std::string& tag=std::string()); @@ -79,7 +79,7 @@ class SubscriptionManager : public sys::Runnable *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ - Completion subscribe(LocalQueue& localQueue, + void subscribe(LocalQueue& localQueue, const std::string& queue, const std::string& tag=std::string()); |