diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/SessionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 56 |
1 files changed, 24 insertions, 32 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index be5eab1f2b..75a71997fd 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -60,14 +60,12 @@ SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactio void SessionImpl::checkError() { - ScopedLock l(lock); qpid::client::SessionBase_0_10Access s(session); s.get()->assertOpen(); } bool SessionImpl::hasError() { - ScopedLock l(lock); qpid::client::SessionBase_0_10Access s(session); return s.get()->hasError(); } @@ -114,14 +112,13 @@ void SessionImpl::release(qpid::messaging::Message& m) execute1<Release>(m); } -void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative) +void SessionImpl::acknowledge(qpid::messaging::Message& m) { //Should probably throw an exception on failure here, or indicate //it through a return type at least. Failure means that the //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message - Acknowledge2 ack(*this, m, cumulative); - execute(ack); + execute1<Acknowledge1>(m); } void SessionImpl::close() @@ -131,29 +128,27 @@ void SessionImpl::close() senders.clear(); receivers.clear(); } else { - Senders sCopy; - Receivers rCopy; - { - ScopedLock l(lock); - senders.swap(sCopy); - receivers.swap(rCopy); - } - for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i) - { - // outside the lock, will call senderCancelled - i->second.close(); + while (true) { + Sender s; + { + ScopedLock l(lock); + if (senders.empty()) break; + s = senders.begin()->second; + } + s.close(); // outside the lock, will call senderCancelled } - for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i) - { - // outside the lock, will call receiverCancelled - i->second.close(); + while (true) { + Receiver r; + { + ScopedLock l(lock); + if (receivers.empty()) break; + r = receivers.begin()->second; + } + r.close(); // outside the lock, will call receiverCancelled } } connection->closed(*this); - if (!hasError()) { - ScopedLock l(lock); - session.close(); - } + if (!hasError()) session.close(); } template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t) @@ -436,11 +431,8 @@ uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination) void SessionImpl::syncImpl(bool block) { - { - ScopedLock l(lock); - if (block) session.sync(); - else session.flush(); - } + if (block) session.sync(); + else session.flush(); //cleanup unconfirmed accept records: incoming.pendingAccept(); } @@ -475,10 +467,10 @@ void SessionImpl::acknowledgeImpl() if (!transactional) incoming.accept(); } -void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative) +void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m) { ScopedLock l(lock); - if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative); + if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId()); } void SessionImpl::rejectImpl(qpid::messaging::Message& m) @@ -517,7 +509,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->reopen(); + connection->open(); } bool SessionImpl::backoff() |