/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Backup.h" #include "Settings.h" #include "BrokerReplicator.h" #include "ReplicatingSubscription.h" #include "ConnectionExcluder.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/Link.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/types/Variant.h" namespace qpid { namespace ha { using namespace framing; using namespace broker; using types::Variant; using std::string; Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s), excluder(new ConnectionExcluder()) { // Empty brokerUrl means delay initialization until setUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); } void Backup::initialize(const Url& url) { assert(!url.empty()); QPID_LOG(notice, "Ha: Backup started: " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; // Declare the link std::pair result = broker.getLinks().declare( url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password); assert(result.second); // FIXME aconway 2011-11-23: error handling link = result.first; link->setUrl(url); replicator.reset(new BrokerReplicator(link)); broker.getExchanges().registerExchange(replicator); broker.getConnectionObservers().add(excluder); } void Backup::setBrokerUrl(const Url& url) { // Ignore empty URLs seen during start-up for some tests. if (url.empty()) return; sys::Mutex::ScopedLock l(lock); if (link) { // URL changed after we initialized. QPID_LOG(info, "HA: Backup failover URL set to " << url); link->setUrl(url); } else { initialize(url); // Deferred initialization } } Backup::~Backup() { if (link) link->close(); if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); broker.getConnectionObservers().remove(excluder); // This allows client connections. } }} // namespace qpid::ha