summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp474
1 files changed, 0 insertions, 474 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
deleted file mode 100644
index 91861ade3f..0000000000
--- a/cpp/src/qpid/broker/Link.cpp
+++ /dev/null
@@ -1,474 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/Link.h"
-#include "qpid/broker/LinkRegistry.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
-#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
-#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
-#include "boost/bind.hpp"
-#include "qpid/log/Statement.h"
-#include "qpid/framing/enum.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/AclModule.h"
-
-using namespace qpid::broker;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::UnauthorizedAccessException;
-using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-using qpid::sys::Mutex;
-using std::stringstream;
-using std::string;
-namespace _qmf = qmf::org::apache::qpid::broker;
-
-Link::Link(LinkRegistry* _links,
- MessageStore* _store,
- string& _host,
- uint16_t _port,
- string& _transport,
- bool _durable,
- string& _authMechanism,
- string& _username,
- string& _password,
- Broker* _broker,
- Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
- durable(_durable),
- authMechanism(_authMechanism), username(_username), password(_password),
- persistenceId(0), mgmtObject(0), broker(_broker), state(0),
- visitCount(0),
- currentInterval(1),
- closing(false),
- updateUrls(false),
- channelCounter(1),
- connection(0),
- agent(0)
-{
- if (parent != 0 && broker != 0)
- {
- agent = broker->getManagementAgent();
- if (agent != 0)
- {
- mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
- agent->addObject(mgmtObject, 0, durable);
- }
- }
- setStateLH(STATE_WAITING);
-}
-
-Link::~Link ()
-{
- if (state == STATE_OPERATIONAL && connection != 0)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
- if (mgmtObject != 0)
- mgmtObject->resourceDestroy ();
-}
-
-void Link::setStateLH (int newState)
-{
- if (newState == state)
- return;
-
- state = newState;
-
- if (hideManagement())
- return;
-
- switch (state)
- {
- case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
- case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
- case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
- case STATE_FAILED : mgmtObject->set_state("Failed"); break;
- case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
- case STATE_PASSIVE : mgmtObject->set_state("Passive"); break;
- }
-}
-
-void Link::startConnectionLH ()
-{
- try {
- // Set the state before calling connect. It is possible that connect
- // will fail synchronously and call Link::closed before returning.
- setStateLH(STATE_CONNECTING);
- broker->connect (host, port, transport,
- boost::bind (&Link::closed, this, _1, _2));
- QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
- } catch(std::exception& e) {
- setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (e.what());
- }
-}
-
-void Link::established ()
-{
- stringstream addr;
- addr << host << ":" << port;
- QPID_LOG (info, "Inter-broker link established to " << addr.str());
-
- if (!hideManagement() && agent)
- agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
-
- {
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- if (closing)
- destroy();
- }
-}
-
-void Link::closed (int, std::string text)
-{
- Mutex::ScopedLock mutex(lock);
- QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
-
- connection = 0;
-
- if (state == STATE_OPERATIONAL) {
- stringstream addr;
- addr << host << ":" << port;
- QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- if (!hideManagement() && agent)
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
- }
-
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->closed();
- created.push_back(*i);
- }
- active.clear();
-
- if (state != STATE_FAILED)
- {
- setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (text);
- }
-
- if (closing)
- destroy();
-}
-
-void Link::destroy ()
-{
- Bridges toDelete;
- {
- Mutex::ScopedLock mutex(lock);
-
- QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
- if (connection)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
- setStateLH(STATE_CLOSED);
-
- // Move the bridges to be deleted into a local vector so there is no
- // corruption of the iterator caused by bridge deletion.
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->closed();
- toDelete.push_back(*i);
- }
- active.clear();
-
- for (Bridges::iterator i = created.begin(); i != created.end(); i++)
- toDelete.push_back(*i);
- created.clear();
- }
- // Now delete all bridges on this link (don't hold the lock for this).
- for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
- (*i)->destroy();
- toDelete.clear();
- links->destroy (host, port);
-}
-
-void Link::add(Bridge::shared_ptr bridge)
-{
- Mutex::ScopedLock mutex(lock);
- created.push_back (bridge);
-}
-
-void Link::cancel(Bridge::shared_ptr bridge)
-{
- {
- Mutex::ScopedLock mutex(lock);
-
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
- }
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- cancellations.push_back(bridge);
- bridge->closed();
- active.erase(i);
- break;
- }
- }
- }
- if (!cancellations.empty()) {
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
- }
-}
-
-void Link::ioThreadProcessing()
-{
- Mutex::ScopedLock mutex(lock);
-
- if (state != STATE_OPERATIONAL)
- return;
- QPID_LOG(debug, "Link::ioThreadProcessing()");
-
- //process any pending creates and/or cancellations
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- active.push_back(*i);
- (*i)->create(*connection);
- }
- created.clear();
- }
- if (!cancellations.empty()) {
- for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
- (*i)->cancel(*connection);
- }
- cancellations.clear();
- }
-}
-
-void Link::setConnection(Connection* c)
-{
- Mutex::ScopedLock mutex(lock);
- connection = c;
- updateUrls = true;
-}
-
-void Link::maintenanceVisit ()
-{
- Mutex::ScopedLock mutex(lock);
-
- if (connection && updateUrls) {
- urls.reset(connection->getKnownHosts());
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
- updateUrls = false;
- }
-
- if (state == STATE_WAITING)
- {
- visitCount++;
- if (visitCount >= currentInterval)
- {
- visitCount = 0;
- //switch host and port to next in url list if possible
- if (!tryFailover()) {
- currentInterval *= 2;
- if (currentInterval > MAX_INTERVAL)
- currentInterval = MAX_INTERVAL;
- startConnectionLH();
- }
- }
- }
- else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
-
-void Link::reconnect(const qpid::Address& a)
-{
- Mutex::ScopedLock mutex(lock);
- host = a.host;
- port = a.port;
- transport = a.protocol;
- startConnectionLH();
- if (!hideManagement()) {
- stringstream errorString;
- errorString << "Failed over to " << a;
- mgmtObject->set_lastError(errorString.str());
- }
-}
-
-bool Link::tryFailover()
-{
- Address next;
- if (urls.next(next) &&
- (next.host != host || next.port != port || next.protocol != transport)) {
- links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Link failing over to " << host << ":" << port);
- return true;
- } else {
- return false;
- }
-}
-
-// Management updates for a linke are inconsistent in a cluster, so they are
-// suppressed.
-bool Link::hideManagement() const {
- return !mgmtObject || ( broker && broker->isInCluster());
-}
-
-uint Link::nextChannel()
-{
- Mutex::ScopedLock mutex(lock);
-
- return channelCounter++;
-}
-
-void Link::notifyConnectionForced(const string text)
-{
- Mutex::ScopedLock mutex(lock);
-
- setStateLH(STATE_FAILED);
- if (!hideManagement())
- mgmtObject->set_lastError(text);
-}
-
-void Link::setPersistenceId(uint64_t id) const
-{
- persistenceId = id;
-}
-
-const string& Link::getName() const
-{
- return host;
-}
-
-Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
-{
- string host;
- uint16_t port;
- string transport;
- string authMechanism;
- string username;
- string password;
-
- buffer.getShortString(host);
- port = buffer.getShort();
- buffer.getShortString(transport);
- bool durable(buffer.getOctet());
- buffer.getShortString(authMechanism);
- buffer.getShortString(username);
- buffer.getShortString(password);
-
- return links.declare(host, port, transport, durable, authMechanism, username, password).first;
-}
-
-void Link::encode(Buffer& buffer) const
-{
- buffer.putShortString(string("link"));
- buffer.putShortString(host);
- buffer.putShort(port);
- buffer.putShortString(transport);
- buffer.putOctet(durable ? 1 : 0);
- buffer.putShortString(authMechanism);
- buffer.putShortString(username);
- buffer.putShortString(password);
-}
-
-uint32_t Link::encodedSize() const
-{
- return host.size() + 1 // short-string (host)
- + 5 // short-string ("link")
- + 2 // port
- + transport.size() + 1 // short-string(transport)
- + 1 // durable
- + authMechanism.size() + 1
- + username.size() + 1
- + password.size() + 1;
-}
-
-ManagementObject* Link::GetManagementObject (void) const
-{
- return (ManagementObject*) mgmtObject;
-}
-
-Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text)
-{
- switch (op)
- {
- case _qmf::Link::METHOD_CLOSE :
- if (!closing) {
- closing = true;
- if (state != STATE_CONNECTING && connection) {
- //connection can only be closed on the connections own IO processing thread
- connection->requestIOProcessing(boost::bind(&Link::destroy, this));
- }
- }
- return Manageable::STATUS_OK;
-
- case _qmf::Link::METHOD_BRIDGE :
- _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
- QPID_LOG(debug, "Link::bridge() request received");
-
- // Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable) {
- text = "Can't create a durable route on a non-durable link";
- return Manageable::STATUS_USER;
- }
-
- if (iargs.i_dynamic) {
- Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
- if (exchange.get() == 0) {
- text = "Exchange not found";
- return Manageable::STATUS_USER;
- }
- if (!exchange->supportsDynamicBinding()) {
- text = "Exchange type does not support dynamic routing";
- return Manageable::STATUS_USER;
- }
- }
-
- std::pair<Bridge::shared_ptr, bool> result =
- links->declare (host, port, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic, iargs.i_sync);
-
- if (result.second && iargs.i_durable)
- store->create(*result.first);
-
- return Manageable::STATUS_OK;
- }
-
- return Manageable::STATUS_UNKNOWN_METHOD;
-}
-
-void Link::setPassive(bool passive)
-{
- Mutex::ScopedLock mutex(lock);
- if (passive) {
- setStateLH(STATE_PASSIVE);
- } else {
- if (state == STATE_PASSIVE) {
- setStateLH(STATE_WAITING);
- } else {
- QPID_LOG(warning, "Ignoring attempt to activate non-passive link");
- }
- }
-}