summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/SessionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp56
1 files changed, 24 insertions, 32 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index be5eab1f2b..75a71997fd 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -60,14 +60,12 @@ SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactio
void SessionImpl::checkError()
{
- ScopedLock l(lock);
qpid::client::SessionBase_0_10Access s(session);
s.get()->assertOpen();
}
bool SessionImpl::hasError()
{
- ScopedLock l(lock);
qpid::client::SessionBase_0_10Access s(session);
return s.get()->hasError();
}
@@ -114,14 +112,13 @@ void SessionImpl::release(qpid::messaging::Message& m)
execute1<Release>(m);
}
-void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledge(qpid::messaging::Message& m)
{
//Should probably throw an exception on failure here, or indicate
//it through a return type at least. Failure means that the
//message may be redelivered; i.e. the application cannot delete
//any state necessary for preventing reprocessing of the message
- Acknowledge2 ack(*this, m, cumulative);
- execute(ack);
+ execute1<Acknowledge1>(m);
}
void SessionImpl::close()
@@ -131,29 +128,27 @@ void SessionImpl::close()
senders.clear();
receivers.clear();
} else {
- Senders sCopy;
- Receivers rCopy;
- {
- ScopedLock l(lock);
- senders.swap(sCopy);
- receivers.swap(rCopy);
- }
- for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i)
- {
- // outside the lock, will call senderCancelled
- i->second.close();
+ while (true) {
+ Sender s;
+ {
+ ScopedLock l(lock);
+ if (senders.empty()) break;
+ s = senders.begin()->second;
+ }
+ s.close(); // outside the lock, will call senderCancelled
}
- for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i)
- {
- // outside the lock, will call receiverCancelled
- i->second.close();
+ while (true) {
+ Receiver r;
+ {
+ ScopedLock l(lock);
+ if (receivers.empty()) break;
+ r = receivers.begin()->second;
+ }
+ r.close(); // outside the lock, will call receiverCancelled
}
}
connection->closed(*this);
- if (!hasError()) {
- ScopedLock l(lock);
- session.close();
- }
+ if (!hasError()) session.close();
}
template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
@@ -436,11 +431,8 @@ uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination)
void SessionImpl::syncImpl(bool block)
{
- {
- ScopedLock l(lock);
- if (block) session.sync();
- else session.flush();
- }
+ if (block) session.sync();
+ else session.flush();
//cleanup unconfirmed accept records:
incoming.pendingAccept();
}
@@ -475,10 +467,10 @@ void SessionImpl::acknowledgeImpl()
if (!transactional) incoming.accept();
}
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
{
ScopedLock l(lock);
- if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
+ if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
}
void SessionImpl::rejectImpl(qpid::messaging::Message& m)
@@ -517,7 +509,7 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->reopen();
+ connection->open();
}
bool SessionImpl::backoff()