summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp474
1 files changed, 474 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
new file mode 100644
index 0000000000..9ab4379a69
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -0,0 +1,474 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Link.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
+#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
+#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
+#include "boost/bind.hpp"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/broker/AclModule.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+using qpid::framing::UnauthorizedAccessException;
+using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::Mutex;
+using std::stringstream;
+using std::string;
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+Link::Link(LinkRegistry* _links,
+ MessageStore* _store,
+ string& _host,
+ uint16_t _port,
+ string& _transport,
+ bool _durable,
+ string& _authMechanism,
+ string& _username,
+ string& _password,
+ Broker* _broker,
+ Manageable* parent)
+ : links(_links), store(_store), host(_host), port(_port),
+ transport(_transport),
+ durable(_durable),
+ authMechanism(_authMechanism), username(_username), password(_password),
+ persistenceId(0), mgmtObject(0), broker(_broker), state(0),
+ visitCount(0),
+ currentInterval(1),
+ closing(false),
+ updateUrls(false),
+ channelCounter(1),
+ connection(0),
+ agent(0)
+{
+ if (parent != 0 && broker != 0)
+ {
+ agent = broker->getManagementAgent();
+ if (agent != 0)
+ {
+ mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
+ agent->addObject(mgmtObject, 0, durable);
+ }
+ }
+ setStateLH(STATE_WAITING);
+}
+
+Link::~Link ()
+{
+ if (state == STATE_OPERATIONAL && connection != 0)
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
+
+ if (mgmtObject != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+void Link::setStateLH (int newState)
+{
+ if (newState == state)
+ return;
+
+ state = newState;
+
+ if (hideManagement())
+ return;
+
+ switch (state)
+ {
+ case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
+ case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
+ case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+ case STATE_FAILED : mgmtObject->set_state("Failed"); break;
+ case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
+ case STATE_PASSIVE : mgmtObject->set_state("Passive"); break;
+ }
+}
+
+void Link::startConnectionLH ()
+{
+ try {
+ // Set the state before calling connect. It is possible that connect
+ // will fail synchronously and call Link::closed before returning.
+ setStateLH(STATE_CONNECTING);
+ broker->connect (host, boost::lexical_cast<std::string>(port), transport,
+ boost::bind (&Link::closed, this, _1, _2));
+ QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
+ } catch(std::exception& e) {
+ setStateLH(STATE_WAITING);
+ if (!hideManagement())
+ mgmtObject->set_lastError (e.what());
+ }
+}
+
+void Link::established ()
+{
+ stringstream addr;
+ addr << host << ":" << port;
+ QPID_LOG (info, "Inter-broker link established to " << addr.str());
+
+ if (!hideManagement() && agent)
+ agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
+ {
+ Mutex::ScopedLock mutex(lock);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ if (closing)
+ destroy();
+ }
+}
+
+void Link::closed (int, std::string text)
+{
+ Mutex::ScopedLock mutex(lock);
+ QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
+
+ connection = 0;
+
+ if (state == STATE_OPERATIONAL) {
+ stringstream addr;
+ addr << host << ":" << port;
+ QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
+ if (!hideManagement() && agent)
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ }
+
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
+ created.push_back(*i);
+ }
+ active.clear();
+
+ if (state != STATE_FAILED)
+ {
+ setStateLH(STATE_WAITING);
+ if (!hideManagement())
+ mgmtObject->set_lastError (text);
+ }
+
+ if (closing)
+ destroy();
+}
+
+void Link::destroy ()
+{
+ Bridges toDelete;
+ {
+ Mutex::ScopedLock mutex(lock);
+
+ QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+ if (connection)
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
+
+ setStateLH(STATE_CLOSED);
+
+ // Move the bridges to be deleted into a local vector so there is no
+ // corruption of the iterator caused by bridge deletion.
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
+ toDelete.push_back(*i);
+ }
+ active.clear();
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++)
+ toDelete.push_back(*i);
+ created.clear();
+ }
+ // Now delete all bridges on this link (don't hold the lock for this).
+ for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
+ (*i)->destroy();
+ toDelete.clear();
+ links->destroy (host, port);
+}
+
+void Link::add(Bridge::shared_ptr bridge)
+{
+ Mutex::ScopedLock mutex(lock);
+ created.push_back (bridge);
+}
+
+void Link::cancel(Bridge::shared_ptr bridge)
+{
+ {
+ Mutex::ScopedLock mutex(lock);
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
+ }
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
+ }
+ }
+ }
+ if (!cancellations.empty()) {
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ }
+}
+
+void Link::ioThreadProcessing()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state != STATE_OPERATIONAL)
+ return;
+ QPID_LOG(debug, "Link::ioThreadProcessing()");
+
+ //process any pending creates and/or cancellations
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ active.push_back(*i);
+ (*i)->create(*connection);
+ }
+ created.clear();
+ }
+ if (!cancellations.empty()) {
+ for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
+ (*i)->cancel(*connection);
+ }
+ cancellations.clear();
+ }
+}
+
+void Link::setConnection(Connection* c)
+{
+ Mutex::ScopedLock mutex(lock);
+ connection = c;
+ updateUrls = true;
+}
+
+void Link::maintenanceVisit ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (connection && updateUrls) {
+ urls.reset(connection->getKnownHosts());
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+ updateUrls = false;
+ }
+
+ if (state == STATE_WAITING)
+ {
+ visitCount++;
+ if (visitCount >= currentInterval)
+ {
+ visitCount = 0;
+ //switch host and port to next in url list if possible
+ if (!tryFailover()) {
+ currentInterval *= 2;
+ if (currentInterval > MAX_INTERVAL)
+ currentInterval = MAX_INTERVAL;
+ startConnectionLH();
+ }
+ }
+ }
+ else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::reconnect(const qpid::Address& a)
+{
+ Mutex::ScopedLock mutex(lock);
+ host = a.host;
+ port = a.port;
+ transport = a.protocol;
+ startConnectionLH();
+ if (!hideManagement()) {
+ stringstream errorString;
+ errorString << "Failed over to " << a;
+ mgmtObject->set_lastError(errorString.str());
+ }
+}
+
+bool Link::tryFailover()
+{
+ Address next;
+ if (urls.next(next) &&
+ (next.host != host || next.port != port || next.protocol != transport)) {
+ links->changeAddress(Address(transport, host, port), next);
+ QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+// Management updates for a linke are inconsistent in a cluster, so they are
+// suppressed.
+bool Link::hideManagement() const {
+ return !mgmtObject || ( broker && broker->isInCluster());
+}
+
+uint Link::nextChannel()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ return channelCounter++;
+}
+
+void Link::notifyConnectionForced(const string text)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ setStateLH(STATE_FAILED);
+ if (!hideManagement())
+ mgmtObject->set_lastError(text);
+}
+
+void Link::setPersistenceId(uint64_t id) const
+{
+ persistenceId = id;
+}
+
+const string& Link::getName() const
+{
+ return host;
+}
+
+Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
+{
+ string host;
+ uint16_t port;
+ string transport;
+ string authMechanism;
+ string username;
+ string password;
+
+ buffer.getShortString(host);
+ port = buffer.getShort();
+ buffer.getShortString(transport);
+ bool durable(buffer.getOctet());
+ buffer.getShortString(authMechanism);
+ buffer.getShortString(username);
+ buffer.getShortString(password);
+
+ return links.declare(host, port, transport, durable, authMechanism, username, password).first;
+}
+
+void Link::encode(Buffer& buffer) const
+{
+ buffer.putShortString(string("link"));
+ buffer.putShortString(host);
+ buffer.putShort(port);
+ buffer.putShortString(transport);
+ buffer.putOctet(durable ? 1 : 0);
+ buffer.putShortString(authMechanism);
+ buffer.putShortString(username);
+ buffer.putShortString(password);
+}
+
+uint32_t Link::encodedSize() const
+{
+ return host.size() + 1 // short-string (host)
+ + 5 // short-string ("link")
+ + 2 // port
+ + transport.size() + 1 // short-string(transport)
+ + 1 // durable
+ + authMechanism.size() + 1
+ + username.size() + 1
+ + password.size() + 1;
+}
+
+ManagementObject* Link::GetManagementObject (void) const
+{
+ return (ManagementObject*) mgmtObject;
+}
+
+Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text)
+{
+ switch (op)
+ {
+ case _qmf::Link::METHOD_CLOSE :
+ if (!closing) {
+ closing = true;
+ if (state != STATE_CONNECTING && connection) {
+ //connection can only be closed on the connections own IO processing thread
+ connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ }
+ }
+ return Manageable::STATUS_OK;
+
+ case _qmf::Link::METHOD_BRIDGE :
+ _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
+ QPID_LOG(debug, "Link::bridge() request received");
+
+ // Durable bridges are only valid on durable links
+ if (iargs.i_durable && !durable) {
+ text = "Can't create a durable route on a non-durable link";
+ return Manageable::STATUS_USER;
+ }
+
+ if (iargs.i_dynamic) {
+ Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
+ if (exchange.get() == 0) {
+ text = "Exchange not found";
+ return Manageable::STATUS_USER;
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ text = "Exchange type does not support dynamic routing";
+ return Manageable::STATUS_USER;
+ }
+ }
+
+ std::pair<Bridge::shared_ptr, bool> result =
+ links->declare (host, port, iargs.i_durable, iargs.i_src,
+ iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+ iargs.i_dynamic, iargs.i_sync);
+
+ if (result.second && iargs.i_durable)
+ store->create(*result.first);
+
+ return Manageable::STATUS_OK;
+ }
+
+ return Manageable::STATUS_UNKNOWN_METHOD;
+}
+
+void Link::setPassive(bool passive)
+{
+ Mutex::ScopedLock mutex(lock);
+ if (passive) {
+ setStateLH(STATE_PASSIVE);
+ } else {
+ if (state == STATE_PASSIVE) {
+ setStateLH(STATE_WAITING);
+ } else {
+ QPID_LOG(warning, "Ignoring attempt to activate non-passive link");
+ }
+ }
+}