summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SessionImpl.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-09 19:36:51 +0000
committerAlan Conway <aconway@apache.org>2008-10-09 19:36:51 +0000
commitd6901e52ab3ee9c40eddc4ad3b4787127c36d874 (patch)
tree85b9ba2e0d0922be150480392ec1b706a6df5cd0 /cpp/src/qpid/client/SessionImpl.cpp
parent016ae5acebab0eaf6dd70f5d4d653fdfee93925d (diff)
downloadqpid-python-d6901e52ab3ee9c40eddc4ad3b4787127c36d874.tar.gz
Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp59
1 files changed, 32 insertions, 27 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 2d64492bf7..49dd97e324 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -51,21 +51,19 @@ typedef sys::Monitor::ScopedUnlock UnLock;
typedef sys::ScopedLock<sys::Semaphore> Acquire;
-SessionImpl::SessionImpl(const std::string& name,
- shared_ptr<ConnectionImpl> conn,
- uint16_t ch, uint64_t _maxFrameSize)
+SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> conn)
: state(INACTIVE),
detachedLifetime(0),
- maxFrameSize(_maxFrameSize),
+ maxFrameSize(conn->getNegotiatedSettings().maxFrameSize),
id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
- connection(conn),
- ioHandler(*this),
- channel(ch),
- proxy(ioHandler),
+ connectionShared(conn),
+ connectionWeak(conn),
+ weakPtr(false),
+ proxy(out),
nextIn(0),
nextOut(0)
{
- channel.next = connection.get();
+ channel.next = connectionShared.get();
}
SessionImpl::~SessionImpl() {
@@ -78,7 +76,8 @@ SessionImpl::~SessionImpl() {
state.waitWaiters();
}
}
- connection->erase(channel);
+ boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
+ if (c) c->erase(channel);
}
@@ -119,6 +118,8 @@ void SessionImpl::close() //user thread
void SessionImpl::resume(shared_ptr<ConnectionImpl>) // user thread
{
+ // weakPtr sessions should not be resumed.
+ if (weakPtr) return;
throw NotImplementedException("Resume not yet implemented by client!");
}
@@ -251,7 +252,6 @@ void SessionImpl::setExceptionLH(const sys::ExceptionHolder& ex) { // Call with
*/
void SessionImpl::connectionClosed(uint16_t code, const std::string& text) {
setException(createConnectionException(code, text));
- // FIXME aconway 2008-10-07: Should closing a connection detach or close its sessions?
handleClosed();
}
@@ -259,9 +259,7 @@ void SessionImpl::connectionClosed(uint16_t code, const std::string& text) {
* Called by ConnectionImpl to notify active sessions when connection
* is disconnected
*/
-void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text)
-{
- // FIXME aconway 2008-10-07: distinguish disconnect from clean close.
+void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) {
connectionClosed(_code, _text);
}
@@ -426,14 +424,11 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread
void SessionImpl::handleOut(AMQFrame& frame) // user thread
{
- connection->expand(frame.encodedSize(), true);
- channel.handle(frame);
-}
-
-void SessionImpl::proxyOut(AMQFrame& frame) // network thread
-{
- connection->expand(frame.encodedSize(), false);
- channel.handle(frame);
+ boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
+ if (c) {
+ c->expand(frame.encodedSize(), true);
+ channel.handle(frame);
+ }
}
void SessionImpl::deliver(AMQFrame& frame) // network thread
@@ -602,11 +597,11 @@ void SessionImpl::exception(uint16_t errorCode,
const std::string& description,
const framing::FieldTable& /*errorInfo*/)
{
- QPID_LOG(warning, "Exception received from peer: " << errorCode << ":" << description
- << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
-
Lock l(state);
setExceptionLH(createSessionException(errorCode, description));
+ QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what()
+ << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
+
if (detachedLifetime)
setTimeout(0);
}
@@ -648,8 +643,6 @@ void SessionImpl::assertOpen() const
void SessionImpl::handleClosed()
{
- // FIXME aconway 2008-06-12: needs to be set to the correct exception type.
- //
demux.close(exceptionHolder.empty() ? new ClosedException() : exceptionHolder);
results.close();
}
@@ -662,4 +655,16 @@ uint32_t SessionImpl::setTimeout(uint32_t seconds) {
return detachedLifetime;
}
+uint32_t SessionImpl::getTimeout() const {
+ return detachedLifetime;
+}
+
+void SessionImpl::setWeakPtr(bool weak) {
+ weakPtr = weak;
+ if (weakPtr)
+ connectionShared.reset(); // Only keep weak pointer
+ else
+ connectionShared = connectionWeak.lock();
+}
+
}}