summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r--M4-RCs/qpid/cpp/src/qpid/broker/Link.cpp398
1 files changed, 0 insertions, 398 deletions
diff --git a/M4-RCs/qpid/cpp/src/qpid/broker/Link.cpp b/M4-RCs/qpid/cpp/src/qpid/broker/Link.cpp
deleted file mode 100644
index bda9c80f0b..0000000000
--- a/M4-RCs/qpid/cpp/src/qpid/broker/Link.cpp
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Link.h"
-#include "LinkRegistry.h"
-#include "Broker.h"
-#include "Connection.h"
-#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
-#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
-#include "boost/bind.hpp"
-#include "qpid/log/Statement.h"
-#include "qpid/framing/enum.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "AclModule.h"
-
-using namespace qpid::broker;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::NotAllowedException;
-using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-using qpid::sys::Mutex;
-using std::stringstream;
-namespace _qmf = qmf::org::apache::qpid::broker;
-
-Link::Link(LinkRegistry* _links,
- MessageStore* _store,
- string& _host,
- uint16_t _port,
- string& _transport,
- bool _durable,
- string& _authMechanism,
- string& _username,
- string& _password,
- Broker* _broker,
- Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
- durable(_durable),
- authMechanism(_authMechanism), username(_username), password(_password),
- persistenceId(0), mgmtObject(0), broker(_broker), state(0),
- visitCount(0),
- currentInterval(1),
- closing(false),
- channelCounter(1),
- connection(0),
- agent(0)
-{
- if (parent != 0)
- {
- agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0)
- {
- mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
- if (!durable)
- agent->addObject(mgmtObject);
- }
- }
- setStateLH(STATE_WAITING);
-}
-
-Link::~Link ()
-{
- if (state == STATE_OPERATIONAL && connection != 0)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
- if (mgmtObject != 0)
- mgmtObject->resourceDestroy ();
-}
-
-void Link::setStateLH (int newState)
-{
- if (newState == state)
- return;
-
- state = newState;
- if (mgmtObject == 0)
- return;
-
- switch (state)
- {
- case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
- case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
- case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
- case STATE_FAILED : mgmtObject->set_state("Failed"); break;
- case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
- }
-}
-
-void Link::startConnectionLH ()
-{
- try {
- // Set the state before calling connect. It is possible that connect
- // will fail synchronously and call Link::closed before returning.
- setStateLH(STATE_CONNECTING);
- broker->connect (host, port, transport,
- boost::bind (&Link::closed, this, _1, _2));
- } catch(std::exception& e) {
- setStateLH(STATE_WAITING);
- if (mgmtObject != 0)
- mgmtObject->set_lastError (e.what());
- }
-}
-
-void Link::established ()
-{
- Mutex::ScopedLock mutex(lock);
- stringstream addr;
- addr << host << ":" << port;
-
- QPID_LOG (info, "Inter-broker link established to " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- if (closing)
- destroy();
-}
-
-void Link::closed (int, std::string text)
-{
- Mutex::ScopedLock mutex(lock);
-
- connection = 0;
-
- if (state == STATE_OPERATIONAL) {
- stringstream addr;
- addr << host << ":" << port;
- QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
- }
-
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->cancel();
- created.push_back(*i);
- }
- active.clear();
-
- if (state != STATE_FAILED)
- {
- setStateLH(STATE_WAITING);
- if (mgmtObject != 0)
- mgmtObject->set_lastError (text);
- }
-
- if (closing)
- destroy();
-}
-
-void Link::destroy ()
-{
- {
- Mutex::ScopedLock mutex(lock);
- Bridges toDelete;
-
- AclModule* acl = getBroker()->getAcl();
- std::string userID = getUsername() + "@" + getBroker()->getOptions().realm;
- if (acl && !acl->authorise(userID,acl::ACT_DELETE,acl::OBJ_LINK,"")){
- throw NotAllowedException("ACL denied delete link request");
- }
-
- QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
- if (connection)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
- setStateLH(STATE_CLOSED);
-
- // Move the bridges to be deleted into a local vector so there is no
- // corruption of the iterator caused by bridge deletion.
- for (Bridges::iterator i = active.begin(); i != active.end(); i++)
- toDelete.push_back(*i);
- active.clear();
-
- for (Bridges::iterator i = created.begin(); i != created.end(); i++)
- toDelete.push_back(*i);
- created.clear();
-
- // Now delete all bridges on this link.
- for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
- (*i)->destroy();
- toDelete.clear();
- }
- links->destroy (host, port);
-}
-
-void Link::add(Bridge::shared_ptr bridge)
-{
- Mutex::ScopedLock mutex(lock);
- created.push_back (bridge);
-}
-
-void Link::cancel(Bridge::shared_ptr bridge)
-{
- Mutex::ScopedLock mutex(lock);
-
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
- }
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- bridge->cancel();
- active.erase(i);
- break;
- }
- }
-}
-
-void Link::ioThreadProcessing()
-{
- Mutex::ScopedLock mutex(lock);
-
- if (state != STATE_OPERATIONAL)
- return;
-
- //process any pending creates
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- active.push_back(*i);
- (*i)->create(*connection);
- }
- created.clear();
- }
-}
-
-void Link::setConnection(Connection* c)
-{
- Mutex::ScopedLock mutex(lock);
- connection = c;
-}
-
-void Link::maintenanceVisit ()
-{
- Mutex::ScopedLock mutex(lock);
-
- if (state == STATE_WAITING)
- {
- visitCount++;
- if (visitCount >= currentInterval)
- {
- visitCount = 0;
- currentInterval *= 2;
- if (currentInterval > MAX_INTERVAL)
- currentInterval = MAX_INTERVAL;
- startConnectionLH();
- }
- }
- else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
-
-uint Link::nextChannel()
-{
- Mutex::ScopedLock mutex(lock);
-
- return channelCounter++;
-}
-
-void Link::notifyConnectionForced(const string text)
-{
- Mutex::ScopedLock mutex(lock);
-
- setStateLH(STATE_FAILED);
- if (mgmtObject != 0)
- mgmtObject->set_lastError(text);
-}
-
-void Link::setPersistenceId(uint64_t id) const
-{
- if (mgmtObject != 0 && persistenceId == 0) {
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- agent->addObject(mgmtObject, id);
- }
- persistenceId = id;
-}
-
-const string& Link::getName() const
-{
- return host;
-}
-
-Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
-{
- string host;
- uint16_t port;
- string transport;
- string authMechanism;
- string username;
- string password;
-
- buffer.getShortString(host);
- port = buffer.getShort();
- buffer.getShortString(transport);
- bool durable(buffer.getOctet());
- buffer.getShortString(authMechanism);
- buffer.getShortString(username);
- buffer.getShortString(password);
-
- return links.declare(host, port, transport, durable, authMechanism, username, password).first;
-}
-
-void Link::encode(Buffer& buffer) const
-{
- buffer.putShortString(string("link"));
- buffer.putShortString(host);
- buffer.putShort(port);
- buffer.putShortString(transport);
- buffer.putOctet(durable ? 1 : 0);
- buffer.putShortString(authMechanism);
- buffer.putShortString(username);
- buffer.putShortString(password);
-}
-
-uint32_t Link::encodedSize() const
-{
- return host.size() + 1 // short-string (host)
- + 5 // short-string ("link")
- + 2 // port
- + transport.size() + 1 // short-string(transport)
- + 1 // durable
- + authMechanism.size() + 1
- + username.size() + 1
- + password.size() + 1;
-}
-
-ManagementObject* Link::GetManagementObject (void) const
-{
- return (ManagementObject*) mgmtObject;
-}
-
-Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text)
-{
- switch (op)
- {
- case _qmf::Link::METHOD_CLOSE :
- closing = true;
- if (state != STATE_CONNECTING)
- destroy();
- return Manageable::STATUS_OK;
-
- case _qmf::Link::METHOD_BRIDGE :
- _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
-
- // Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable) {
- text = "Can't create a durable route on a non-durable link";
- return Manageable::STATUS_USER;
- }
-
- if (iargs.i_dynamic) {
- Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
- if (exchange.get() == 0) {
- text = "Exchange not found";
- return Manageable::STATUS_USER;
- }
- if (!exchange->supportsDynamicBinding()) {
- text = "Exchange type does not support dynamic routing";
- return Manageable::STATUS_USER;
- }
- }
-
- std::pair<Bridge::shared_ptr, bool> result =
- links->declare (host, port, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic);
-
- if (result.second && iargs.i_durable)
- store->create(*result.first);
-
- return Manageable::STATUS_OK;
- }
-
- return Manageable::STATUS_UNKNOWN_METHOD;
-}