diff options
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp new file mode 100644 index 0000000000..83c9a2a62e --- /dev/null +++ b/cpp/src/qpid/broker/Link.cpp @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Link.h" +#include "LinkRegistry.h" +#include "Broker.h" +#include "Connection.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/management/Link.h" +#include "boost/bind.hpp" +#include "qpid/log/Statement.h" + +using namespace qpid::broker; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::sys::Mutex; + +Link::Link(LinkRegistry* _links, + string& _host, + uint16_t _port, + bool _useSsl, + bool _durable, + Broker* _broker, + management::Manageable* parent) + : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable), + persistenceId(0), broker(_broker), state(0), + access(boost::bind(&Link::established, this), + boost::bind(&Link::closed, this, _1, _2), + boost::bind(&Link::setConnection, this, _1)), + visitCount(0), + currentInterval(1), + closing(false), + channelCounter(1) +{ + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); + if (agent.get() != 0) + { + mgmtObject = management::Link::shared_ptr + (new management::Link(this, parent, _host, _port, _useSsl, _durable)); + if (!durable) + agent->addObject(mgmtObject); + } + } + setState(STATE_WAITING); +} + +Link::~Link () +{ + if (state == STATE_OPERATIONAL) + access.close(); + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +void Link::setState (int newState) +{ + if (newState == state) + return; + + state = newState; + if (mgmtObject.get() == 0) + return; + + switch (state) + { + case STATE_WAITING : mgmtObject->set_state("Waiting"); break; + case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break; + case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; + } +} + +void Link::startConnection () +{ + try { + broker->connect (host, port, useSsl, 0, &access); + setState(STATE_CONNECTING); + } catch(std::exception& e) { + setState(STATE_WAITING); + mgmtObject->set_lastError (e.what()); + } +} + +void Link::established () +{ + Mutex::ScopedLock mutex(lock); + + QPID_LOG (info, "Inter-broker link established to " << host << ":" << port); + setState(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + if (closing) + destroy(); +} + +void Link::closed (int, std::string text) +{ + Mutex::ScopedLock mutex(lock); + + if (state == STATE_OPERATIONAL) + QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); + + connection.reset(); + created.transfer(created.end(), active.begin(), active.end(), active); + setState(STATE_WAITING); + mgmtObject->set_lastError (text); + if (closing) + destroy(); +} + +void Link::destroy () +{ + QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); + connection.reset(); + links->destroy (host, port); +} + +void Link::cancel(Bridge* bridge) +{ + Mutex::ScopedLock mutex(lock); + + //need to take this out of the active map and add it to the cancelled map + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if (&(*i) == bridge) { + cancelled.transfer(cancelled.end(), i, active); + break; + } + } + + if (connection.get() != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::ioThreadProcessing() +{ + Mutex::ScopedLock mutex(lock); + + //process any pending creates + if (!created.empty()) { + for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { + i->create(*connection); + } + active.transfer(active.end(), created.begin(), created.end(), created); + } + if (!cancelled.empty()) { + //process any pending cancellations + for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { + i->cancel(); + } + cancelled.clear(); + } +} + +void Link::setConnection(Connection::shared_ptr c) +{ + connection = c; + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::maintenanceVisit () +{ + Mutex::ScopedLock mutex(lock); + + if (state == STATE_WAITING) + { + visitCount++; + if (visitCount >= currentInterval) + { + visitCount = 0; + currentInterval *= 2; + if (currentInterval > MAX_INTERVAL) + currentInterval = MAX_INTERVAL; + startConnection(); + } + } +} + +void Link::setPersistenceId(uint64_t id) const +{ + if (mgmtObject != 0 && persistenceId == 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject, id); + } + persistenceId = id; +} + +const string& Link::getName() const +{ + return host; +} + +Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) +{ + string host; + uint16_t port; + + buffer.getShortString(host); + port = buffer.getShort(); + bool useSsl(buffer.getOctet()); + bool durable(buffer.getOctet()); + + return links.declare(host, port, useSsl, durable).first; +} + +void Link::encode(Buffer& buffer) const +{ + buffer.putShortString(string("link")); + buffer.putShortString(host); + buffer.putShort(port); + buffer.putOctet(useSsl ? 1 : 0); + buffer.putOctet(durable ? 1 : 0); +} + +uint32_t Link::encodedSize() const +{ + return host.size() + 1 // short-string (host) + + 5 // short-string ("link") + + 2 // port + + 1 // useSsl + + 1; // durable +} + +ManagementObject::shared_ptr Link::GetManagementObject (void) const +{ + return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args) +{ + Mutex::ScopedLock mutex(lock); + + switch (op) + { + case management::Link::METHOD_CLOSE : + closing = true; + if (state != STATE_CONNECTING) + destroy(); + return Manageable::STATUS_OK; + + case management::Link::METHOD_BRIDGE : + management::ArgsLinkBridge iargs = + dynamic_cast<const management::ArgsLinkBridge&>(args); + + // Durable bridges are only valid on durable links + if (iargs.i_durable && !durable) + return Manageable::STATUS_INVALID_PARAMETER; + + created.push_back(new Bridge(this, channelCounter++, + boost::bind(&Link::cancel, this, _1), iargs)); + + if (state == STATE_OPERATIONAL && connection.get() != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + return Manageable::STATUS_OK; + } + + return Manageable::STATUS_UNKNOWN_METHOD; +} |