diff options
author | Gordon Sim <gsim@apache.org> | 2015-04-30 20:39:39 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2015-04-30 20:39:39 +0000 |
commit | e616850745d92d523733703689d1db4902df9a8b (patch) | |
tree | d6f34060dd60041bff4473662003556474e8a73a | |
parent | 9d5c1d00fbd34ee18416d2857ebe9e8234db78a9 (diff) | |
download | qpid-python-e616850745d92d523733703689d1db4902df9a8b.tar.gz |
QPID-6526: make sure that the creation of proton links is done with the lock held
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1677064 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 39 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp | 24 |
3 files changed, 50 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 9d9f186f5c..ffc4247ee3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -417,7 +417,6 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { - sys::Monitor::ScopedLock l(lock); lnk->configure(); attach(ssn, lnk->sender); checkClosed(ssn, lnk); @@ -427,7 +426,6 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - sys::Monitor::ScopedLock l(lock); lnk->configure(); attach(ssn, lnk->receiver, lnk->capacity); checkClosed(ssn, lnk); @@ -447,6 +445,43 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t* } } +boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) +{ + sys::Monitor::ScopedLock l(lock); + boost::shared_ptr<SenderContext> sender = session->createSender(address, setToOnSend); + try { + attach(session, sender); + return sender; + } catch (...) { + session->removeSender(sender->getName()); + throw; + } + +} +boost::shared_ptr<ReceiverContext> ConnectionContext::createReceiver(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) +{ + sys::Monitor::ScopedLock l(lock); + boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address); + try { + attach(session, receiver); + return receiver; + } catch (...) { + session->removeReceiver(receiver->getName()); + throw; + } +} +boost::shared_ptr<SenderContext> ConnectionContext::getSender(boost::shared_ptr<SessionContext> session, const std::string& name) const +{ + sys::Monitor::ScopedLock l(lock); + return session->getSender(name); +} + +boost::shared_ptr<ReceiverContext> ConnectionContext::getReceiver(boost::shared_ptr<SessionContext> session, const std::string& name) const +{ + sys::Monitor::ScopedLock l(lock); + return session->getReceiver(name); +} + void ConnectionContext::send( boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index b687219624..ba3220c0ab 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -76,8 +76,11 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag boost::shared_ptr<SessionContext> newSession(bool transactional, const std::string& name); boost::shared_ptr<SessionContext> getSession(const std::string& name) const; void endSession(boost::shared_ptr<SessionContext>); - void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); - void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + boost::shared_ptr<SenderContext> createSender(boost::shared_ptr<SessionContext>, const qpid::messaging::Address& address); + boost::shared_ptr<ReceiverContext> createReceiver(boost::shared_ptr<SessionContext>, const qpid::messaging::Address& address); + boost::shared_ptr<SenderContext> getSender(boost::shared_ptr<SessionContext>, const std::string& name) const; + boost::shared_ptr<ReceiverContext> getReceiver(boost::shared_ptr<SessionContext>, const std::string& name) const; + void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); @@ -191,6 +194,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void wakeupDriver(); void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0); + void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void autoconnect(); bool tryConnectUrl(const qpid::Url& url); bool tryOpenAddr(const qpid::Address& address); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp index 44294e5f04..6b90d69c7f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -84,26 +84,14 @@ void SessionHandle::sync(bool block) qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address) { - boost::shared_ptr<SenderContext> sender = session->createSender(address, connection->setToOnSend); - try { - connection->attach(session, sender); - return qpid::messaging::Sender(new SenderHandle(connection, session, sender)); - } catch (...) { - session->removeSender(sender->getName()); - throw; - } + boost::shared_ptr<SenderContext> sender = connection->createSender(session, address); + return qpid::messaging::Sender(new SenderHandle(connection, session, sender)); } qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address) { - boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address); - try { - connection->attach(session, receiver); - return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver)); - } catch (...) { - session->removeReceiver(receiver->getName()); - throw; - } + boost::shared_ptr<ReceiverContext> receiver = connection->createReceiver(session, address); + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver)); } bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout) @@ -137,12 +125,12 @@ uint32_t SessionHandle::getUnsettledAcks() Sender SessionHandle::getSender(const std::string& name) const { - return qpid::messaging::Sender(new SenderHandle(connection, session, session->getSender(name))); + return qpid::messaging::Sender(new SenderHandle(connection, session, connection->getSender(session, name))); } Receiver SessionHandle::getReceiver(const std::string& name) const { - return qpid::messaging::Receiver(new ReceiverHandle(connection, session, session->getReceiver(name))); + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, connection->getReceiver(session, name))); } Connection SessionHandle::getConnection() const |