diff options
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 135 |
1 files changed, 95 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 83c9a2a62e..cd032495e2 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -37,14 +37,19 @@ using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::Mutex; -Link::Link(LinkRegistry* _links, - string& _host, - uint16_t _port, - bool _useSsl, - bool _durable, - Broker* _broker, +Link::Link(LinkRegistry* _links, + MessageStore* _store, + string& _host, + uint16_t _port, + bool _useSsl, + bool _durable, + string& _authMechanism, + string& _username, + string& _password, + Broker* _broker, management::Manageable* parent) - : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable), + : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable), + authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), broker(_broker), state(0), access(boost::bind(&Link::established, this), boost::bind(&Link::closed, this, _1, _2), @@ -65,7 +70,7 @@ Link::Link(LinkRegistry* _links, agent->addObject(mgmtObject); } } - setState(STATE_WAITING); + setStateLH(STATE_WAITING); } Link::~Link () @@ -76,7 +81,7 @@ Link::~Link () mgmtObject->resourceDestroy (); } -void Link::setState (int newState) +void Link::setStateLH (int newState) { if (newState == state) return; @@ -93,13 +98,13 @@ void Link::setState (int newState) } } -void Link::startConnection () +void Link::startConnectionLH () { try { broker->connect (host, port, useSsl, 0, &access); - setState(STATE_CONNECTING); + setStateLH(STATE_CONNECTING); } catch(std::exception& e) { - setState(STATE_WAITING); + setStateLH(STATE_WAITING); mgmtObject->set_lastError (e.what()); } } @@ -109,7 +114,7 @@ void Link::established () Mutex::ScopedLock mutex(lock); QPID_LOG (info, "Inter-broker link established to " << host << ":" << port); - setState(STATE_OPERATIONAL); + setStateLH(STATE_OPERATIONAL); currentInterval = 1; visitCount = 0; if (closing) @@ -124,8 +129,11 @@ void Link::closed (int, std::string text) QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); connection.reset(); - created.transfer(created.end(), active.begin(), active.end(), active); - setState(STATE_WAITING); + for (Bridges::iterator i = active.begin(); i != active.end(); i++) + created.push_back(*i); + active.clear(); + + setStateLH(STATE_WAITING); mgmtObject->set_lastError (text); if (closing) destroy(); @@ -133,25 +141,56 @@ void Link::closed (int, std::string text) void Link::destroy () { + Mutex::ScopedLock mutex(lock); + Bridges toDelete; + QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); connection.reset(); + + // Move the bridges to be deleted into a local vector so there is no + // corruption of the iterator caused by bridge deletion. + for (Bridges::iterator i = active.begin(); i != active.end(); i++) + toDelete.push_back(*i); + active.clear(); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) + toDelete.push_back(*i); + created.clear(); + + // Now delete all bridges on this link. + for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) + (*i)->destroy(); + toDelete.clear(); + links->destroy (host, port); } -void Link::cancel(Bridge* bridge) +void Link::add(Bridge::shared_ptr bridge) { Mutex::ScopedLock mutex(lock); - //need to take this out of the active map and add it to the cancelled map + created.push_back (bridge); + if (state == STATE_OPERATIONAL && connection.get() != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::cancel(Bridge::shared_ptr bridge) +{ + Mutex::ScopedLock mutex(lock); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; + } + } for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if (&(*i) == bridge) { - cancelled.transfer(cancelled.end(), i, active); + if ((*i).get() == bridge.get()) { + bridge->cancel(); + active.erase(i); break; } } - - if (connection.get() != 0) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::ioThreadProcessing() @@ -161,21 +200,17 @@ void Link::ioThreadProcessing() //process any pending creates if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { - i->create(*connection); - } - active.transfer(active.end(), created.begin(), created.end(), created); - } - if (!cancelled.empty()) { - //process any pending cancellations - for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { - i->cancel(); + active.push_back(*i); + (*i)->create(*connection); } - cancelled.clear(); + created.clear(); } } void Link::setConnection(Connection::shared_ptr c) { + Mutex::ScopedLock mutex(lock); + connection = c; connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -193,11 +228,18 @@ void Link::maintenanceVisit () currentInterval *= 2; if (currentInterval > MAX_INTERVAL) currentInterval = MAX_INTERVAL; - startConnection(); + startConnectionLH(); } } } +uint Link::nextChannel() +{ + Mutex::ScopedLock mutex(lock); + + return channelCounter++; +} + void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) @@ -217,13 +259,19 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) { string host; uint16_t port; + string authMechanism; + string username; + string password; buffer.getShortString(host); port = buffer.getShort(); bool useSsl(buffer.getOctet()); bool durable(buffer.getOctet()); + buffer.getShortString(authMechanism); + buffer.getShortString(username); + buffer.getShortString(password); - return links.declare(host, port, useSsl, durable).first; + return links.declare(host, port, useSsl, durable, authMechanism, username, password).first; } void Link::encode(Buffer& buffer) const @@ -233,6 +281,9 @@ void Link::encode(Buffer& buffer) const buffer.putShort(port); buffer.putOctet(useSsl ? 1 : 0); buffer.putOctet(durable ? 1 : 0); + buffer.putShortString(authMechanism); + buffer.putShortString(username); + buffer.putShortString(password); } uint32_t Link::encodedSize() const @@ -241,7 +292,10 @@ uint32_t Link::encodedSize() const + 5 // short-string ("link") + 2 // port + 1 // useSsl - + 1; // durable + + 1 // durable + + authMechanism.size() + 1 + + username.size() + 1 + + password.size() + 1; } ManagementObject::shared_ptr Link::GetManagementObject (void) const @@ -251,8 +305,6 @@ ManagementObject::shared_ptr Link::GetManagementObject (void) const Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args) { - Mutex::ScopedLock mutex(lock); - switch (op) { case management::Link::METHOD_CLOSE : @@ -269,11 +321,14 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args if (iargs.i_durable && !durable) return Manageable::STATUS_INVALID_PARAMETER; - created.push_back(new Bridge(this, channelCounter++, - boost::bind(&Link::cancel, this, _1), iargs)); + std::pair<Bridge::shared_ptr, bool> result = + links->declare (host, port, iargs.i_durable, iargs.i_src, + iargs.i_dest, iargs.i_key, iargs.i_src_is_queue, + iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes); + + if (result.second && iargs.i_durable) + store->create(*result.first); - if (state == STATE_OPERATIONAL && connection.get() != 0) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); return Manageable::STATUS_OK; } |