diff options
author | Alan Conway <aconway@apache.org> | 2008-10-09 19:36:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-09 19:36:51 +0000 |
commit | d6901e52ab3ee9c40eddc4ad3b4787127c36d874 (patch) | |
tree | 85b9ba2e0d0922be150480392ec1b706a6df5cd0 /cpp/src/qpid/client/SessionImpl.cpp | |
parent | 016ae5acebab0eaf6dd70f5d4d653fdfee93925d (diff) | |
download | qpid-python-d6901e52ab3ee9c40eddc4ad3b4787127c36d874.tar.gz |
Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 59 |
1 files changed, 32 insertions, 27 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 2d64492bf7..49dd97e324 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -51,21 +51,19 @@ typedef sys::Monitor::ScopedUnlock UnLock; typedef sys::ScopedLock<sys::Semaphore> Acquire; -SessionImpl::SessionImpl(const std::string& name, - shared_ptr<ConnectionImpl> conn, - uint16_t ch, uint64_t _maxFrameSize) +SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> conn) : state(INACTIVE), detachedLifetime(0), - maxFrameSize(_maxFrameSize), + maxFrameSize(conn->getNegotiatedSettings().maxFrameSize), id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name), - connection(conn), - ioHandler(*this), - channel(ch), - proxy(ioHandler), + connectionShared(conn), + connectionWeak(conn), + weakPtr(false), + proxy(out), nextIn(0), nextOut(0) { - channel.next = connection.get(); + channel.next = connectionShared.get(); } SessionImpl::~SessionImpl() { @@ -78,7 +76,8 @@ SessionImpl::~SessionImpl() { state.waitWaiters(); } } - connection->erase(channel); + boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock(); + if (c) c->erase(channel); } @@ -119,6 +118,8 @@ void SessionImpl::close() //user thread void SessionImpl::resume(shared_ptr<ConnectionImpl>) // user thread { + // weakPtr sessions should not be resumed. + if (weakPtr) return; throw NotImplementedException("Resume not yet implemented by client!"); } @@ -251,7 +252,6 @@ void SessionImpl::setExceptionLH(const sys::ExceptionHolder& ex) { // Call with */ void SessionImpl::connectionClosed(uint16_t code, const std::string& text) { setException(createConnectionException(code, text)); - // FIXME aconway 2008-10-07: Should closing a connection detach or close its sessions? handleClosed(); } @@ -259,9 +259,7 @@ void SessionImpl::connectionClosed(uint16_t code, const std::string& text) { * Called by ConnectionImpl to notify active sessions when connection * is disconnected */ -void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) -{ - // FIXME aconway 2008-10-07: distinguish disconnect from clean close. +void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) { connectionClosed(_code, _text); } @@ -426,14 +424,11 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread void SessionImpl::handleOut(AMQFrame& frame) // user thread { - connection->expand(frame.encodedSize(), true); - channel.handle(frame); -} - -void SessionImpl::proxyOut(AMQFrame& frame) // network thread -{ - connection->expand(frame.encodedSize(), false); - channel.handle(frame); + boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock(); + if (c) { + c->expand(frame.encodedSize(), true); + channel.handle(frame); + } } void SessionImpl::deliver(AMQFrame& frame) // network thread @@ -602,11 +597,11 @@ void SessionImpl::exception(uint16_t errorCode, const std::string& description, const framing::FieldTable& /*errorInfo*/) { - QPID_LOG(warning, "Exception received from peer: " << errorCode << ":" << description - << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); - Lock l(state); setExceptionLH(createSessionException(errorCode, description)); + QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() + << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); + if (detachedLifetime) setTimeout(0); } @@ -648,8 +643,6 @@ void SessionImpl::assertOpen() const void SessionImpl::handleClosed() { - // FIXME aconway 2008-06-12: needs to be set to the correct exception type. - // demux.close(exceptionHolder.empty() ? new ClosedException() : exceptionHolder); results.close(); } @@ -662,4 +655,16 @@ uint32_t SessionImpl::setTimeout(uint32_t seconds) { return detachedLifetime; } +uint32_t SessionImpl::getTimeout() const { + return detachedLifetime; +} + +void SessionImpl::setWeakPtr(bool weak) { + weakPtr = weak; + if (weakPtr) + connectionShared.reset(); // Only keep weak pointer + else + connectionShared = connectionWeak.lock(); +} + }} |