summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp27
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h4
-rw-r--r--cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp5
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp37
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.h3
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp42
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h7
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp47
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h1
-rw-r--r--cpp/src/qpid/messaging/PrivateImplRef.h6
10 files changed, 115 insertions, 64 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 2c581e9d41..3ebc5f17ad 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -109,6 +109,7 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio
void ConnectionImpl::setOptions(const Variant::Map& options)
{
+ sys::Mutex::ScopedLock l(lock);
convert(options, settings);
setIfFound(options, "reconnect", reconnect);
setIfFound(options, "reconnect-timeout", timeout);
@@ -139,13 +140,14 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
void ConnectionImpl::close()
{
- std::vector<std::string> names;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- for (Sessions::const_iterator i = sessions.begin(); i != sessions.end(); ++i) names.push_back(i->first);
- }
- for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
- getSession(*i).close();
+ while(true) {
+ messaging::Session session;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (sessions.empty()) break;
+ session = sessions.begin()->second;
+ }
+ session.close();
}
detach();
}
@@ -246,12 +248,7 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
bool ConnectionImpl::tryConnect()
{
- if (tryConnect(urls)) return resetSessions();
- else return false;
-}
-
-bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
-{
+ sys::Mutex::ScopedLock l(lock);
for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
try {
QPID_LOG(info, "Trying to connect to " << *i << "...");
@@ -264,7 +261,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
connection.open(settings);
}
QPID_LOG(info, "Connected to " << *i);
- return true;
+ return resetSessions(l);
} catch (const qpid::ConnectionException& e) {
//TODO: need to fix timeout on
//qpid::client::Connection::open() so that it throws
@@ -275,7 +272,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
return false;
}
-bool ConnectionImpl::resetSessions()
+bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& )
{
try {
qpid::sys::Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index b6fd33cc49..93929a6034 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -68,8 +68,8 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
void setOptions(const qpid::types::Variant::Map& options);
void connect(const qpid::sys::AbsTime& started);
bool tryConnect();
- bool tryConnect(const std::vector<std::string>& urls);
- bool resetSessions();
+ bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held.
+
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
index fc3900f917..09a9aa06d3 100644
--- a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
+++ b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
@@ -51,8 +51,9 @@ struct FailoverUpdatesImpl : qpid::sys::Runnable
}
~FailoverUpdatesImpl() {
- receiver.close();
- session.close();
+ try {
+ session.close();
+ } catch(...) {} // Squash exceptions in a destructor.
thread.join();
}
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index c26b2eb09f..b5d7bf78f4 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -104,6 +104,7 @@ struct Match
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
+ sys::Mutex::ScopedLock l(lock);
session = s;
incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
acceptTracker.reset();
@@ -111,13 +112,16 @@ void IncomingMessages::setSession(qpid::client::AsyncSession s)
bool IncomingMessages::get(Handler& handler, Duration timeout)
{
- //search through received list for any transfer of interest:
- for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
{
- MessageTransfer transfer(*i, *this);
- if (handler.accept(transfer)) {
- received.erase(i);
- return true;
+ sys::Mutex::ScopedLock l(lock);
+ //search through received list for any transfer of interest:
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
+ {
+ MessageTransfer transfer(*i, *this);
+ if (handler.accept(transfer)) {
+ received.erase(i);
+ return true;
+ }
}
}
//none found, check incoming:
@@ -126,6 +130,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
{
+ sys::Mutex::ScopedLock l(lock);
//if there is not already a received message, we must wait for one
if (received.empty() && !wait(timeout)) return false;
//else we have a message in received; return the corresponding destination
@@ -135,20 +140,25 @@ bool IncomingMessages::getNextDestination(std::string& destination, Duration tim
void IncomingMessages::accept()
{
+ sys::Mutex::ScopedLock l(lock);
acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
{
- //first process any received messages...
- while (!received.empty()) {
- retrieve(received.front(), 0);
- received.pop_front();
+ {
+ //first process any received messages...
+ sys::Mutex::ScopedLock l(lock);
+ while (!received.empty()) {
+ retrieve(received.front(), 0);
+ received.pop_front();
+ }
}
//then pump out any available messages from incoming queue...
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
+ sys::Mutex::ScopedLock l(lock);
acceptTracker.release(session);
}
@@ -158,6 +168,7 @@ void IncomingMessages::releasePending(const std::string& destination)
while (process(0, 0)) ;
//now remove all messages for this destination from received list, recording their ids...
+ sys::Mutex::ScopedLock l(lock);
MatchAndTrack match(destination);
for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ;
//now release those messages
@@ -184,6 +195,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
} else {
//received message for another destination, keep for later
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
received.push_back(content);
}
} else {
@@ -200,6 +212,7 @@ bool IncomingMessages::wait(qpid::sys::Duration duration)
for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
received.push_back(content);
return true;
} else {
@@ -211,10 +224,12 @@ bool IncomingMessages::wait(qpid::sys::Duration duration)
uint32_t IncomingMessages::pendingAccept()
{
+ sys::Mutex::ScopedLock l(lock);
return acceptTracker.acceptsPending();
}
uint32_t IncomingMessages::pendingAccept(const std::string& destination)
{
+ sys::Mutex::ScopedLock l(lock);
return acceptTracker.acceptsPending(destination);
}
@@ -223,6 +238,7 @@ uint32_t IncomingMessages::available()
//first pump all available messages from incoming to received...
while (process(0, 0)) {}
//return the count of received messages
+ sys::Mutex::ScopedLock l(lock);
return received.size();
}
@@ -232,6 +248,7 @@ uint32_t IncomingMessages::available(const std::string& destination)
while (process(0, 0)) {}
//count all messages for this destination from received list
+ sys::Mutex::ScopedLock l(lock);
return std::for_each(received.begin(), received.end(), Match(destination)).matched;
}
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index 2bc6dd49c4..9640890d76 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -43,7 +43,7 @@ namespace client {
namespace amqp0_10 {
/**
- *
+ * Queue of incoming messages.
*/
class IncomingMessages
{
@@ -83,6 +83,7 @@ class IncomingMessages
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
+ sys::Mutex lock;
qpid::client::AsyncSession session;
boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
FrameSetQueue received;
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 435459d97f..49cfec7497 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -37,6 +37,7 @@ using qpid::messaging::Duration;
void ReceiverImpl::received(qpid::messaging::Message&)
{
//TODO: should this be configurable
+ sys::Mutex::ScopedLock l(lock);
if (capacity && --window <= capacity/2) {
session.sendCompletion();
window = capacity;
@@ -78,14 +79,16 @@ void ReceiverImpl::close()
void ReceiverImpl::start()
{
+ sys::Mutex::ScopedLock l(lock);
if (state == STOPPED) {
state = STARTED;
- startFlow();
+ startFlow(l);
}
}
void ReceiverImpl::stop()
{
+ sys::Mutex::ScopedLock l(lock);
state = STOPPED;
session.messageStop(destination);
}
@@ -95,7 +98,7 @@ void ReceiverImpl::setCapacity(uint32_t c)
execute1<SetCapacity>(c);
}
-void ReceiverImpl::startFlow()
+void ReceiverImpl::startFlow(const sys::Mutex::ScopedLock&)
{
if (capacity > 0) {
session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
@@ -107,10 +110,11 @@ void ReceiverImpl::startFlow()
void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
-
+ sys::Mutex::ScopedLock l(lock);
session = s;
if (state == UNRESOLVED) {
source = resolver.resolveSource(session, address);
+ assert(source.get());
state = STARTED;
}
if (state == CANCELLED) {
@@ -118,15 +122,19 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve
parent->receiverCancelled(destination);
} else {
source->subscribe(session, destination);
- startFlow();
+ startFlow(l);
}
}
-const std::string& ReceiverImpl::getName() const { return destination; }
+const std::string& ReceiverImpl::getName() const {
+ sys::Mutex::ScopedLock l(lock);
+ return destination;
+}
uint32_t ReceiverImpl::getCapacity()
{
+ sys::Mutex::ScopedLock l(lock);
return capacity;
}
@@ -153,25 +161,31 @@ bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::D
bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
- if (state == CANCELLED) return false;//TODO: or should this be an error?
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (state == CANCELLED) return false;//TODO: or should this be an error?
- if (capacity == 0 || state != STARTED) {
- session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
- session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
- session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ if (capacity == 0 || state != STARTED) {
+ session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ }
}
-
if (getImpl(message, timeout)) {
return true;
} else {
sync(session).messageFlush(destination);
- startFlow();//reallocate credit
+ {
+ sys::Mutex::ScopedLock l(lock);
+ startFlow(l); //reallocate credit
+ }
return getImpl(message, Duration::IMMEDIATE);
}
}
void ReceiverImpl::closeImpl()
{
+ sys::Mutex::ScopedLock l(lock);
if (state != CANCELLED) {
state = CANCELLED;
source->cancel(session, destination);
@@ -181,14 +195,16 @@ void ReceiverImpl::closeImpl()
void ReceiverImpl::setCapacityImpl(uint32_t c)
{
+ sys::Mutex::ScopedLock l(lock);
if (c != capacity) {
capacity = c;
if (state == STARTED) {
session.messageStop(destination);
- startFlow();
+ startFlow(l);
}
}
}
+
qpid::messaging::Session ReceiverImpl::getSession() const
{
return qpid::messaging::Session(parent.get());
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index e6d11e4bb5..c7e24b774a 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -27,6 +27,7 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
#include "qpid/messaging/Duration.h"
+#include "qpid/sys/Mutex.h"
#include <boost/intrusive_ptr.hpp>
#include <memory>
@@ -65,6 +66,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
void received(qpid::messaging::Message& message);
qpid::messaging::Session getSession() const;
private:
+ mutable sys::Mutex lock;
boost::intrusive_ptr<SessionImpl> parent;
const std::string destination;
const qpid::messaging::Address address;
@@ -77,15 +79,14 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
qpid::messaging::MessageListener* listener;
uint32_t window;
- void startFlow();
+ void startFlow(const sys::Mutex::ScopedLock&); // Dummy param, call with lock held
//implementation of public facing methods
bool fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void closeImpl();
void setCapacityImpl(uint32_t);
- //functors for public facing methods (allows locking and retry
- //logic to be centralised)
+ //functors for public facing methods.
struct Command
{
ReceiverImpl& impl;
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index a55a2737cb..a6067097bb 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -53,6 +53,9 @@ namespace qpid {
namespace client {
namespace amqp0_10 {
+typedef qpid::sys::Mutex::ScopedLock ScopedLock;
+typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
+
SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
void SessionImpl::checkError()
@@ -112,23 +115,29 @@ void SessionImpl::release(qpid::messaging::Message& m)
void SessionImpl::close()
{
if (hasError()) {
+ ScopedLock l(lock);
senders.clear();
receivers.clear();
} else {
- //close all the senders and receivers (get copy of names and then
- //make the calls to avoid modifying maps while iterating over
- //them):
- std::vector<std::string> s;
- std::vector<std::string> r;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- for (Senders::const_iterator i = senders.begin(); i != senders.end(); ++i) s.push_back(i->first);
- for (Receivers::const_iterator i = receivers.begin(); i != receivers.end(); ++i) r.push_back(i->first);
+ while (true) {
+ Sender s;
+ {
+ ScopedLock l(lock);
+ if (senders.empty()) break;
+ s = senders.begin()->second;
+ }
+ s.close(); // outside the lock, will call senderCancelled
+ }
+ while (true) {
+ Receiver r;
+ {
+ ScopedLock l(lock);
+ if (receivers.empty()) break;
+ r = receivers.begin()->second;
+ }
+ r.close(); // outside the lock, will call receiverCancelled
}
- for (std::vector<std::string>::const_iterator i = s.begin(); i != s.end(); ++i) getSender(*i).close();
- for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close();
}
-
connection->closed(*this);
if (!hasError()) session.close();
}
@@ -151,7 +160,7 @@ template <class T> void getFreeKey(std::string& key, T& map)
void SessionImpl::setSession(qpid::client::Session s)
{
- qpid::sys::Mutex::ScopedLock l(lock);
+ ScopedLock l(lock);
session = s;
incoming.setSession(session);
if (transactional) session.txSelect();
@@ -181,6 +190,7 @@ Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address)
Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address)
{
+ ScopedLock l(lock);
std::string name = address.getName();
getFreeKey(name, receivers);
Receiver receiver(new ReceiverImpl(*this, name, address));
@@ -205,7 +215,8 @@ Sender SessionImpl::createSender(const qpid::messaging::Address& address)
}
Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
-{
+{
+ ScopedLock l(lock);
std::string name = address.getName();
getFreeKey(name, senders);
Sender sender(new SenderImpl(*this, name, address));
@@ -265,6 +276,7 @@ struct IncomingMessageHandler : IncomingMessages::Handler
bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer)
{
+ ScopedLock l(lock);
Receivers::const_iterator i = receivers.find(transfer.getDestination());
if (i == receivers.end()) {
QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
@@ -371,6 +383,7 @@ struct SessionImpl::Receivable : Command
uint32_t SessionImpl::getReceivableImpl(const std::string* destination)
{
+ ScopedLock l(lock);
if (destination) {
return incoming.available(*destination);
} else {
@@ -399,6 +412,7 @@ struct SessionImpl::UnsettledAcks : Command
uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination)
{
+ ScopedLock l(lock);
if (destination) {
return incoming.pendingAccept(*destination);
} else {
@@ -414,12 +428,14 @@ void SessionImpl::syncImpl(bool block)
void SessionImpl::commitImpl()
{
+ ScopedLock l(lock);
incoming.accept();
session.txCommit();
}
void SessionImpl::rollbackImpl()
{
+ ScopedLock l(lock);
for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
}
@@ -436,6 +452,7 @@ void SessionImpl::rollbackImpl()
void SessionImpl::acknowledgeImpl()
{
+ ScopedLock l(lock);
if (!transactional) incoming.accept();
}
@@ -455,6 +472,7 @@ void SessionImpl::releaseImpl(qpid::messaging::Message& m)
void SessionImpl::receiverCancelled(const std::string& name)
{
+ ScopedLock l(lock);
receivers.erase(name);
session.sync();
incoming.releasePending(name);
@@ -462,6 +480,7 @@ void SessionImpl::receiverCancelled(const std::string& name)
void SessionImpl::senderCancelled(const std::string& name)
{
+ ScopedLock l(lock);
senders.erase(name);
}
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index bc02f0ff8b..a4a00e1481 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -94,7 +94,6 @@ class SessionImpl : public qpid::messaging::SessionImpl
template <class T> bool execute(T& f)
{
try {
- qpid::sys::Mutex::ScopedLock l(lock);
f();
return true;
} catch (const qpid::TransportFailure&) {
diff --git a/cpp/src/qpid/messaging/PrivateImplRef.h b/cpp/src/qpid/messaging/PrivateImplRef.h
index cc2798c647..e77c58d071 100644
--- a/cpp/src/qpid/messaging/PrivateImplRef.h
+++ b/cpp/src/qpid/messaging/PrivateImplRef.h
@@ -29,9 +29,7 @@
namespace qpid {
namespace messaging {
-// FIXME aconway 2009-04-24: details!
-/** @file
- *
+/**
* Helper class to implement a class with a private, reference counted
* implementation and reference semantics.
*
@@ -73,8 +71,10 @@ template <class T> class PrivateImplRef {
typedef typename T::Impl Impl;
typedef boost::intrusive_ptr<Impl> intrusive_ptr;
+ /** Get the implementation pointer from a handle */
static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); }
+ /** Set the implementation pointer in a handle */
static void set(T& t, const intrusive_ptr& p) {
if (t.impl == p) return;
if (t.impl) boost::intrusive_ptr_release(t.impl);