diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 27 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/PrivateImplRef.h | 6 |
10 files changed, 115 insertions, 64 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 2c581e9d41..3ebc5f17ad 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -109,6 +109,7 @@ ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& optio void ConnectionImpl::setOptions(const Variant::Map& options) { + sys::Mutex::ScopedLock l(lock); convert(options, settings); setIfFound(options, "reconnect", reconnect); setIfFound(options, "reconnect-timeout", timeout); @@ -139,13 +140,14 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) void ConnectionImpl::close() { - std::vector<std::string> names; - { - qpid::sys::Mutex::ScopedLock l(lock); - for (Sessions::const_iterator i = sessions.begin(); i != sessions.end(); ++i) names.push_back(i->first); - } - for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) { - getSession(*i).close(); + while(true) { + messaging::Session session; + { + qpid::sys::Mutex::ScopedLock l(lock); + if (sessions.empty()) break; + session = sessions.begin()->second; + } + session.close(); } detach(); } @@ -246,12 +248,7 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) bool ConnectionImpl::tryConnect() { - if (tryConnect(urls)) return resetSessions(); - else return false; -} - -bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) -{ + sys::Mutex::ScopedLock l(lock); for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { try { QPID_LOG(info, "Trying to connect to " << *i << "..."); @@ -264,7 +261,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) connection.open(settings); } QPID_LOG(info, "Connected to " << *i); - return true; + return resetSessions(l); } catch (const qpid::ConnectionException& e) { //TODO: need to fix timeout on //qpid::client::Connection::open() so that it throws @@ -275,7 +272,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) return false; } -bool ConnectionImpl::resetSessions() +bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& ) { try { qpid::sys::Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index b6fd33cc49..93929a6034 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -68,8 +68,8 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl void setOptions(const qpid::types::Variant::Map& options); void connect(const qpid::sys::AbsTime& started); bool tryConnect(); - bool tryConnect(const std::vector<std::string>& urls); - bool resetSessions(); + bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held. + }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp index fc3900f917..09a9aa06d3 100644 --- a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp +++ b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp @@ -51,8 +51,9 @@ struct FailoverUpdatesImpl : qpid::sys::Runnable } ~FailoverUpdatesImpl() { - receiver.close(); - session.close(); + try { + session.close(); + } catch(...) {} // Squash exceptions in a destructor. thread.join(); } diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index c26b2eb09f..b5d7bf78f4 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -104,6 +104,7 @@ struct Match void IncomingMessages::setSession(qpid::client::AsyncSession s) { + sys::Mutex::ScopedLock l(lock); session = s; incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); acceptTracker.reset(); @@ -111,13 +112,16 @@ void IncomingMessages::setSession(qpid::client::AsyncSession s) bool IncomingMessages::get(Handler& handler, Duration timeout) { - //search through received list for any transfer of interest: - for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++) { - MessageTransfer transfer(*i, *this); - if (handler.accept(transfer)) { - received.erase(i); - return true; + sys::Mutex::ScopedLock l(lock); + //search through received list for any transfer of interest: + for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++) + { + MessageTransfer transfer(*i, *this); + if (handler.accept(transfer)) { + received.erase(i); + return true; + } } } //none found, check incoming: @@ -126,6 +130,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout) { + sys::Mutex::ScopedLock l(lock); //if there is not already a received message, we must wait for one if (received.empty() && !wait(timeout)) return false; //else we have a message in received; return the corresponding destination @@ -135,20 +140,25 @@ bool IncomingMessages::getNextDestination(std::string& destination, Duration tim void IncomingMessages::accept() { + sys::Mutex::ScopedLock l(lock); acceptTracker.accept(session); } void IncomingMessages::releaseAll() { - //first process any received messages... - while (!received.empty()) { - retrieve(received.front(), 0); - received.pop_front(); + { + //first process any received messages... + sys::Mutex::ScopedLock l(lock); + while (!received.empty()) { + retrieve(received.front(), 0); + received.pop_front(); + } } //then pump out any available messages from incoming queue... GetAny handler; while (process(&handler, 0)) ; //now release all messages + sys::Mutex::ScopedLock l(lock); acceptTracker.release(session); } @@ -158,6 +168,7 @@ void IncomingMessages::releasePending(const std::string& destination) while (process(0, 0)) ; //now remove all messages for this destination from received list, recording their ids... + sys::Mutex::ScopedLock l(lock); MatchAndTrack match(destination); for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ; //now release those messages @@ -184,6 +195,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) } else { //received message for another destination, keep for later QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + sys::Mutex::ScopedLock l(lock); received.push_back(content); } } else { @@ -200,6 +212,7 @@ bool IncomingMessages::wait(qpid::sys::Duration duration) for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA<MessageTransferBody>()) { QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + sys::Mutex::ScopedLock l(lock); received.push_back(content); return true; } else { @@ -211,10 +224,12 @@ bool IncomingMessages::wait(qpid::sys::Duration duration) uint32_t IncomingMessages::pendingAccept() { + sys::Mutex::ScopedLock l(lock); return acceptTracker.acceptsPending(); } uint32_t IncomingMessages::pendingAccept(const std::string& destination) { + sys::Mutex::ScopedLock l(lock); return acceptTracker.acceptsPending(destination); } @@ -223,6 +238,7 @@ uint32_t IncomingMessages::available() //first pump all available messages from incoming to received... while (process(0, 0)) {} //return the count of received messages + sys::Mutex::ScopedLock l(lock); return received.size(); } @@ -232,6 +248,7 @@ uint32_t IncomingMessages::available(const std::string& destination) while (process(0, 0)) {} //count all messages for this destination from received list + sys::Mutex::ScopedLock l(lock); return std::for_each(received.begin(), received.end(), Match(destination)).matched; } diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 2bc6dd49c4..9640890d76 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -43,7 +43,7 @@ namespace client { namespace amqp0_10 { /** - * + * Queue of incoming messages. */ class IncomingMessages { @@ -83,6 +83,7 @@ class IncomingMessages private: typedef std::deque<FrameSetPtr> FrameSetQueue; + sys::Mutex lock; qpid::client::AsyncSession session; boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming; FrameSetQueue received; diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 435459d97f..49cfec7497 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -37,6 +37,7 @@ using qpid::messaging::Duration; void ReceiverImpl::received(qpid::messaging::Message&) { //TODO: should this be configurable + sys::Mutex::ScopedLock l(lock); if (capacity && --window <= capacity/2) { session.sendCompletion(); window = capacity; @@ -78,14 +79,16 @@ void ReceiverImpl::close() void ReceiverImpl::start() { + sys::Mutex::ScopedLock l(lock); if (state == STOPPED) { state = STARTED; - startFlow(); + startFlow(l); } } void ReceiverImpl::stop() { + sys::Mutex::ScopedLock l(lock); state = STOPPED; session.messageStop(destination); } @@ -95,7 +98,7 @@ void ReceiverImpl::setCapacity(uint32_t c) execute1<SetCapacity>(c); } -void ReceiverImpl::startFlow() +void ReceiverImpl::startFlow(const sys::Mutex::ScopedLock&) { if (capacity > 0) { session.messageSetFlowMode(destination, FLOW_MODE_WINDOW); @@ -107,10 +110,11 @@ void ReceiverImpl::startFlow() void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { - + sys::Mutex::ScopedLock l(lock); session = s; if (state == UNRESOLVED) { source = resolver.resolveSource(session, address); + assert(source.get()); state = STARTED; } if (state == CANCELLED) { @@ -118,15 +122,19 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve parent->receiverCancelled(destination); } else { source->subscribe(session, destination); - startFlow(); + startFlow(l); } } -const std::string& ReceiverImpl::getName() const { return destination; } +const std::string& ReceiverImpl::getName() const { + sys::Mutex::ScopedLock l(lock); + return destination; +} uint32_t ReceiverImpl::getCapacity() { + sys::Mutex::ScopedLock l(lock); return capacity; } @@ -153,25 +161,31 @@ bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::D bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { - if (state == CANCELLED) return false;//TODO: or should this be an error? + { + sys::Mutex::ScopedLock l(lock); + if (state == CANCELLED) return false;//TODO: or should this be an error? - if (capacity == 0 || state != STARTED) { - session.messageSetFlowMode(destination, FLOW_MODE_CREDIT); - session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); - session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF); + if (capacity == 0 || state != STARTED) { + session.messageSetFlowMode(destination, FLOW_MODE_CREDIT); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); + session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF); + } } - if (getImpl(message, timeout)) { return true; } else { sync(session).messageFlush(destination); - startFlow();//reallocate credit + { + sys::Mutex::ScopedLock l(lock); + startFlow(l); //reallocate credit + } return getImpl(message, Duration::IMMEDIATE); } } void ReceiverImpl::closeImpl() { + sys::Mutex::ScopedLock l(lock); if (state != CANCELLED) { state = CANCELLED; source->cancel(session, destination); @@ -181,14 +195,16 @@ void ReceiverImpl::closeImpl() void ReceiverImpl::setCapacityImpl(uint32_t c) { + sys::Mutex::ScopedLock l(lock); if (c != capacity) { capacity = c; if (state == STARTED) { session.messageStop(destination); - startFlow(); + startFlow(l); } } } + qpid::messaging::Session ReceiverImpl::getSession() const { return qpid::messaging::Session(parent.get()); diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index e6d11e4bb5..c7e24b774a 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -27,6 +27,7 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/amqp0_10/SessionImpl.h" #include "qpid/messaging/Duration.h" +#include "qpid/sys/Mutex.h" #include <boost/intrusive_ptr.hpp> #include <memory> @@ -65,6 +66,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void received(qpid::messaging::Message& message); qpid::messaging::Session getSession() const; private: + mutable sys::Mutex lock; boost::intrusive_ptr<SessionImpl> parent; const std::string destination; const qpid::messaging::Address address; @@ -77,15 +79,14 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl qpid::messaging::MessageListener* listener; uint32_t window; - void startFlow(); + void startFlow(const sys::Mutex::ScopedLock&); // Dummy param, call with lock held //implementation of public facing methods bool fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout); bool getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout); void closeImpl(); void setCapacityImpl(uint32_t); - //functors for public facing methods (allows locking and retry - //logic to be centralised) + //functors for public facing methods. struct Command { ReceiverImpl& impl; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index a55a2737cb..a6067097bb 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -53,6 +53,9 @@ namespace qpid { namespace client { namespace amqp0_10 { +typedef qpid::sys::Mutex::ScopedLock ScopedLock; +typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; + SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} void SessionImpl::checkError() @@ -112,23 +115,29 @@ void SessionImpl::release(qpid::messaging::Message& m) void SessionImpl::close() { if (hasError()) { + ScopedLock l(lock); senders.clear(); receivers.clear(); } else { - //close all the senders and receivers (get copy of names and then - //make the calls to avoid modifying maps while iterating over - //them): - std::vector<std::string> s; - std::vector<std::string> r; - { - qpid::sys::Mutex::ScopedLock l(lock); - for (Senders::const_iterator i = senders.begin(); i != senders.end(); ++i) s.push_back(i->first); - for (Receivers::const_iterator i = receivers.begin(); i != receivers.end(); ++i) r.push_back(i->first); + while (true) { + Sender s; + { + ScopedLock l(lock); + if (senders.empty()) break; + s = senders.begin()->second; + } + s.close(); // outside the lock, will call senderCancelled + } + while (true) { + Receiver r; + { + ScopedLock l(lock); + if (receivers.empty()) break; + r = receivers.begin()->second; + } + r.close(); // outside the lock, will call receiverCancelled } - for (std::vector<std::string>::const_iterator i = s.begin(); i != s.end(); ++i) getSender(*i).close(); - for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close(); } - connection->closed(*this); if (!hasError()) session.close(); } @@ -151,7 +160,7 @@ template <class T> void getFreeKey(std::string& key, T& map) void SessionImpl::setSession(qpid::client::Session s) { - qpid::sys::Mutex::ScopedLock l(lock); + ScopedLock l(lock); session = s; incoming.setSession(session); if (transactional) session.txSelect(); @@ -181,6 +190,7 @@ Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address) Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address) { + ScopedLock l(lock); std::string name = address.getName(); getFreeKey(name, receivers); Receiver receiver(new ReceiverImpl(*this, name, address)); @@ -205,7 +215,8 @@ Sender SessionImpl::createSender(const qpid::messaging::Address& address) } Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address) -{ +{ + ScopedLock l(lock); std::string name = address.getName(); getFreeKey(name, senders); Sender sender(new SenderImpl(*this, name, address)); @@ -265,6 +276,7 @@ struct IncomingMessageHandler : IncomingMessages::Handler bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer) { + ScopedLock l(lock); Receivers::const_iterator i = receivers.find(transfer.getDestination()); if (i == receivers.end()) { QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); @@ -371,6 +383,7 @@ struct SessionImpl::Receivable : Command uint32_t SessionImpl::getReceivableImpl(const std::string* destination) { + ScopedLock l(lock); if (destination) { return incoming.available(*destination); } else { @@ -399,6 +412,7 @@ struct SessionImpl::UnsettledAcks : Command uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination) { + ScopedLock l(lock); if (destination) { return incoming.pendingAccept(*destination); } else { @@ -414,12 +428,14 @@ void SessionImpl::syncImpl(bool block) void SessionImpl::commitImpl() { + ScopedLock l(lock); incoming.accept(); session.txCommit(); } void SessionImpl::rollbackImpl() { + ScopedLock l(lock); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr<Receiver, ReceiverImpl>(i->second)->stop(); } @@ -436,6 +452,7 @@ void SessionImpl::rollbackImpl() void SessionImpl::acknowledgeImpl() { + ScopedLock l(lock); if (!transactional) incoming.accept(); } @@ -455,6 +472,7 @@ void SessionImpl::releaseImpl(qpid::messaging::Message& m) void SessionImpl::receiverCancelled(const std::string& name) { + ScopedLock l(lock); receivers.erase(name); session.sync(); incoming.releasePending(name); @@ -462,6 +480,7 @@ void SessionImpl::receiverCancelled(const std::string& name) void SessionImpl::senderCancelled(const std::string& name) { + ScopedLock l(lock); senders.erase(name); } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index bc02f0ff8b..a4a00e1481 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -94,7 +94,6 @@ class SessionImpl : public qpid::messaging::SessionImpl template <class T> bool execute(T& f) { try { - qpid::sys::Mutex::ScopedLock l(lock); f(); return true; } catch (const qpid::TransportFailure&) { diff --git a/cpp/src/qpid/messaging/PrivateImplRef.h b/cpp/src/qpid/messaging/PrivateImplRef.h index cc2798c647..e77c58d071 100644 --- a/cpp/src/qpid/messaging/PrivateImplRef.h +++ b/cpp/src/qpid/messaging/PrivateImplRef.h @@ -29,9 +29,7 @@ namespace qpid { namespace messaging { -// FIXME aconway 2009-04-24: details! -/** @file - * +/** * Helper class to implement a class with a private, reference counted * implementation and reference semantics. * @@ -73,8 +71,10 @@ template <class T> class PrivateImplRef { typedef typename T::Impl Impl; typedef boost::intrusive_ptr<Impl> intrusive_ptr; + /** Get the implementation pointer from a handle */ static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); } + /** Set the implementation pointer in a handle */ static void set(T& t, const intrusive_ptr& p) { if (t.impl == p) return; if (t.impl) boost::intrusive_ptr_release(t.impl); |