/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Link.h" #include "LinkRegistry.h" #include "Broker.h" #include "Connection.h" #include "qpid/management/ManagementAgent.h" #include "qpid/management/Link.h" #include "boost/bind.hpp" #include "qpid/log/Statement.h" using namespace qpid::broker; using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::Mutex; Link::Link(LinkRegistry* _links, string& _host, uint16_t _port, bool _useSsl, bool _durable, Broker* _broker, management::Manageable* parent) : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable), persistenceId(0), broker(_broker), state(0), access(boost::bind(&Link::established, this), boost::bind(&Link::closed, this, _1, _2), boost::bind(&Link::setConnection, this, _1)), visitCount(0), currentInterval(1), closing(false), channelCounter(1) { if (parent != 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); if (agent.get() != 0) { mgmtObject = management::Link::shared_ptr (new management::Link(this, parent, _host, _port, _useSsl, _durable)); if (!durable) agent->addObject(mgmtObject); } } setState(STATE_WAITING); } Link::~Link () { if (state == STATE_OPERATIONAL) access.close(); if (mgmtObject.get () != 0) mgmtObject->resourceDestroy (); } void Link::setState (int newState) { if (newState == state) return; state = newState; if (mgmtObject.get() == 0) return; switch (state) { case STATE_WAITING : mgmtObject->set_state("Waiting"); break; case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break; case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; } } void Link::startConnection () { try { broker->connect (host, port, useSsl, 0, &access); setState(STATE_CONNECTING); } catch(std::exception& e) { setState(STATE_WAITING); mgmtObject->set_lastError (e.what()); } } void Link::established () { Mutex::ScopedLock mutex(lock); QPID_LOG (info, "Inter-broker link established to " << host << ":" << port); setState(STATE_OPERATIONAL); currentInterval = 1; visitCount = 0; if (closing) destroy(); } void Link::closed (int, std::string text) { Mutex::ScopedLock mutex(lock); if (state == STATE_OPERATIONAL) QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); connection.reset(); created.transfer(created.end(), active.begin(), active.end(), active); setState(STATE_WAITING); mgmtObject->set_lastError (text); if (closing) destroy(); } void Link::destroy () { QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); connection.reset(); links->destroy (host, port); } void Link::cancel(Bridge* bridge) { Mutex::ScopedLock mutex(lock); //need to take this out of the active map and add it to the cancelled map for (Bridges::iterator i = active.begin(); i != active.end(); i++) { if (&(*i) == bridge) { cancelled.transfer(cancelled.end(), i, active); break; } } if (connection.get() != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::ioThreadProcessing() { Mutex::ScopedLock mutex(lock); //process any pending creates if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { i->create(*connection); } active.transfer(active.end(), created.begin(), created.end(), created); } if (!cancelled.empty()) { //process any pending cancellations for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { i->cancel(); } cancelled.clear(); } } void Link::setConnection(Connection::shared_ptr c) { connection = c; connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); if (state == STATE_WAITING) { visitCount++; if (visitCount >= currentInterval) { visitCount = 0; currentInterval *= 2; if (currentInterval > MAX_INTERVAL) currentInterval = MAX_INTERVAL; startConnection(); } } } void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); agent->addObject (mgmtObject, id); } persistenceId = id; } const string& Link::getName() const { return host; } Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) { string host; uint16_t port; buffer.getShortString(host); port = buffer.getShort(); bool useSsl(buffer.getOctet()); bool durable(buffer.getOctet()); return links.declare(host, port, useSsl, durable).first; } void Link::encode(Buffer& buffer) const { buffer.putShortString(string("link")); buffer.putShortString(host); buffer.putShort(port); buffer.putOctet(useSsl ? 1 : 0); buffer.putOctet(durable ? 1 : 0); } uint32_t Link::encodedSize() const { return host.size() + 1 // short-string (host) + 5 // short-string ("link") + 2 // port + 1 // useSsl + 1; // durable } ManagementObject::shared_ptr Link::GetManagementObject (void) const { return boost::dynamic_pointer_cast (mgmtObject); } Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args) { Mutex::ScopedLock mutex(lock); switch (op) { case management::Link::METHOD_CLOSE : closing = true; if (state != STATE_CONNECTING) destroy(); return Manageable::STATUS_OK; case management::Link::METHOD_BRIDGE : management::ArgsLinkBridge iargs = dynamic_cast(args); // Durable bridges are only valid on durable links if (iargs.i_durable && !durable) return Manageable::STATUS_INVALID_PARAMETER; created.push_back(new Bridge(this, channelCounter++, boost::bind(&Link::cancel, this, _1), iargs)); if (state == STATE_OPERATIONAL && connection.get() != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); return Manageable::STATUS_OK; } return Manageable::STATUS_UNKNOWN_METHOD; }