summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp281
1 files changed, 281 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
new file mode 100644
index 0000000000..83c9a2a62e
--- /dev/null
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Link.h"
+#include "LinkRegistry.h"
+#include "Broker.h"
+#include "Connection.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/Link.h"
+#include "boost/bind.hpp"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::Mutex;
+
+Link::Link(LinkRegistry* _links,
+ string& _host,
+ uint16_t _port,
+ bool _useSsl,
+ bool _durable,
+ Broker* _broker,
+ management::Manageable* parent)
+ : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ persistenceId(0), broker(_broker), state(0),
+ access(boost::bind(&Link::established, this),
+ boost::bind(&Link::closed, this, _1, _2),
+ boost::bind(&Link::setConnection, this, _1)),
+ visitCount(0),
+ currentInterval(1),
+ closing(false),
+ channelCounter(1)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
+ if (agent.get() != 0)
+ {
+ mgmtObject = management::Link::shared_ptr
+ (new management::Link(this, parent, _host, _port, _useSsl, _durable));
+ if (!durable)
+ agent->addObject(mgmtObject);
+ }
+ }
+ setState(STATE_WAITING);
+}
+
+Link::~Link ()
+{
+ if (state == STATE_OPERATIONAL)
+ access.close();
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+void Link::setState (int newState)
+{
+ if (newState == state)
+ return;
+
+ state = newState;
+ if (mgmtObject.get() == 0)
+ return;
+
+ switch (state)
+ {
+ case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
+ case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
+ case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+ }
+}
+
+void Link::startConnection ()
+{
+ try {
+ broker->connect (host, port, useSsl, 0, &access);
+ setState(STATE_CONNECTING);
+ } catch(std::exception& e) {
+ setState(STATE_WAITING);
+ mgmtObject->set_lastError (e.what());
+ }
+}
+
+void Link::established ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ QPID_LOG (info, "Inter-broker link established to " << host << ":" << port);
+ setState(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ if (closing)
+ destroy();
+}
+
+void Link::closed (int, std::string text)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state == STATE_OPERATIONAL)
+ QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
+
+ connection.reset();
+ created.transfer(created.end(), active.begin(), active.end(), active);
+ setState(STATE_WAITING);
+ mgmtObject->set_lastError (text);
+ if (closing)
+ destroy();
+}
+
+void Link::destroy ()
+{
+ QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+ connection.reset();
+ links->destroy (host, port);
+}
+
+void Link::cancel(Bridge* bridge)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ //need to take this out of the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == bridge) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+
+ if (connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::ioThreadProcessing()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create(*connection);
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void Link::setConnection(Connection::shared_ptr c)
+{
+ connection = c;
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::maintenanceVisit ()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ if (state == STATE_WAITING)
+ {
+ visitCount++;
+ if (visitCount >= currentInterval)
+ {
+ visitCount = 0;
+ currentInterval *= 2;
+ if (currentInterval > MAX_INTERVAL)
+ currentInterval = MAX_INTERVAL;
+ startConnection();
+ }
+ }
+}
+
+void Link::setPersistenceId(uint64_t id) const
+{
+ if (mgmtObject != 0 && persistenceId == 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtObject, id);
+ }
+ persistenceId = id;
+}
+
+const string& Link::getName() const
+{
+ return host;
+}
+
+Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
+{
+ string host;
+ uint16_t port;
+
+ buffer.getShortString(host);
+ port = buffer.getShort();
+ bool useSsl(buffer.getOctet());
+ bool durable(buffer.getOctet());
+
+ return links.declare(host, port, useSsl, durable).first;
+}
+
+void Link::encode(Buffer& buffer) const
+{
+ buffer.putShortString(string("link"));
+ buffer.putShortString(host);
+ buffer.putShort(port);
+ buffer.putOctet(useSsl ? 1 : 0);
+ buffer.putOctet(durable ? 1 : 0);
+}
+
+uint32_t Link::encodedSize() const
+{
+ return host.size() + 1 // short-string (host)
+ + 5 // short-string ("link")
+ + 2 // port
+ + 1 // useSsl
+ + 1; // durable
+}
+
+ManagementObject::shared_ptr Link::GetManagementObject (void) const
+{
+ return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ switch (op)
+ {
+ case management::Link::METHOD_CLOSE :
+ closing = true;
+ if (state != STATE_CONNECTING)
+ destroy();
+ return Manageable::STATUS_OK;
+
+ case management::Link::METHOD_BRIDGE :
+ management::ArgsLinkBridge iargs =
+ dynamic_cast<const management::ArgsLinkBridge&>(args);
+
+ // Durable bridges are only valid on durable links
+ if (iargs.i_durable && !durable)
+ return Manageable::STATUS_INVALID_PARAMETER;
+
+ created.push_back(new Bridge(this, channelCounter++,
+ boost::bind(&Link::cancel, this, _1), iargs));
+
+ if (state == STATE_OPERATIONAL && connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ return Manageable::STATUS_OK;
+ }
+
+ return Manageable::STATUS_UNKNOWN_METHOD;
+}