summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2015-04-30 20:39:39 +0000
committerGordon Sim <gsim@apache.org>2015-04-30 20:39:39 +0000
commite616850745d92d523733703689d1db4902df9a8b (patch)
treed6f34060dd60041bff4473662003556474e8a73a
parent9d5c1d00fbd34ee18416d2857ebe9e8234db78a9 (diff)
downloadqpid-python-e616850745d92d523733703689d1db4902df9a8b.tar.gz
QPID-6526: make sure that the creation of proton links is done with the lock held
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1677064 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp39
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h9
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp24
3 files changed, 50 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 9d9f186f5c..ffc4247ee3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -417,7 +417,6 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
- sys::Monitor::ScopedLock l(lock);
lnk->configure();
attach(ssn, lnk->sender);
checkClosed(ssn, lnk);
@@ -427,7 +426,6 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
- sys::Monitor::ScopedLock l(lock);
lnk->configure();
attach(ssn, lnk->receiver, lnk->capacity);
checkClosed(ssn, lnk);
@@ -447,6 +445,43 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t*
}
}
+boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address)
+{
+ sys::Monitor::ScopedLock l(lock);
+ boost::shared_ptr<SenderContext> sender = session->createSender(address, setToOnSend);
+ try {
+ attach(session, sender);
+ return sender;
+ } catch (...) {
+ session->removeSender(sender->getName());
+ throw;
+ }
+
+}
+boost::shared_ptr<ReceiverContext> ConnectionContext::createReceiver(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address)
+{
+ sys::Monitor::ScopedLock l(lock);
+ boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address);
+ try {
+ attach(session, receiver);
+ return receiver;
+ } catch (...) {
+ session->removeReceiver(receiver->getName());
+ throw;
+ }
+}
+boost::shared_ptr<SenderContext> ConnectionContext::getSender(boost::shared_ptr<SessionContext> session, const std::string& name) const
+{
+ sys::Monitor::ScopedLock l(lock);
+ return session->getSender(name);
+}
+
+boost::shared_ptr<ReceiverContext> ConnectionContext::getReceiver(boost::shared_ptr<SessionContext> session, const std::string& name) const
+{
+ sys::Monitor::ScopedLock l(lock);
+ return session->getReceiver(name);
+}
+
void ConnectionContext::send(
boost::shared_ptr<SessionContext> ssn,
boost::shared_ptr<SenderContext> snd,
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index b687219624..ba3220c0ab 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -76,8 +76,11 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
boost::shared_ptr<SessionContext> newSession(bool transactional, const std::string& name);
boost::shared_ptr<SessionContext> getSession(const std::string& name) const;
void endSession(boost::shared_ptr<SessionContext>);
- void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
- void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ boost::shared_ptr<SenderContext> createSender(boost::shared_ptr<SessionContext>, const qpid::messaging::Address& address);
+ boost::shared_ptr<ReceiverContext> createReceiver(boost::shared_ptr<SessionContext>, const qpid::messaging::Address& address);
+ boost::shared_ptr<SenderContext> getSender(boost::shared_ptr<SessionContext>, const std::string& name) const;
+ boost::shared_ptr<ReceiverContext> getReceiver(boost::shared_ptr<SessionContext>, const std::string& name) const;
+
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
@@ -191,6 +194,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void wakeupDriver();
void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
+ void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+ void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void autoconnect();
bool tryConnectUrl(const qpid::Url& url);
bool tryOpenAddr(const qpid::Address& address);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
index 44294e5f04..6b90d69c7f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -84,26 +84,14 @@ void SessionHandle::sync(bool block)
qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address)
{
- boost::shared_ptr<SenderContext> sender = session->createSender(address, connection->setToOnSend);
- try {
- connection->attach(session, sender);
- return qpid::messaging::Sender(new SenderHandle(connection, session, sender));
- } catch (...) {
- session->removeSender(sender->getName());
- throw;
- }
+ boost::shared_ptr<SenderContext> sender = connection->createSender(session, address);
+ return qpid::messaging::Sender(new SenderHandle(connection, session, sender));
}
qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address)
{
- boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address);
- try {
- connection->attach(session, receiver);
- return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver));
- } catch (...) {
- session->removeReceiver(receiver->getName());
- throw;
- }
+ boost::shared_ptr<ReceiverContext> receiver = connection->createReceiver(session, address);
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver));
}
bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
@@ -137,12 +125,12 @@ uint32_t SessionHandle::getUnsettledAcks()
Sender SessionHandle::getSender(const std::string& name) const
{
- return qpid::messaging::Sender(new SenderHandle(connection, session, session->getSender(name)));
+ return qpid::messaging::Sender(new SenderHandle(connection, session, connection->getSender(session, name)));
}
Receiver SessionHandle::getReceiver(const std::string& name) const
{
- return qpid::messaging::Receiver(new ReceiverHandle(connection, session, session->getReceiver(name)));
+ return qpid::messaging::Receiver(new ReceiverHandle(connection, session, connection->getReceiver(session, name)));
}
Connection SessionHandle::getConnection() const