diff options
author | Alan Conway <aconway@apache.org> | 2012-02-13 23:50:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-13 23:50:18 +0000 |
commit | 057a0635e081848ba59964c4a8e0923e521b7fe6 (patch) | |
tree | cdad9578140d5dbdb2e90525e2ba9cc870ca50b6 /qpid/cpp/src/qpid/broker/Bridge.cpp | |
parent | cf61dbf9b313f9bd69b392eae8fd8d27d4e609a2 (diff) | |
download | qpid-python-057a0635e081848ba59964c4a8e0923e521b7fe6.tar.gz |
Merge branch 'qpid-3603-4-rebase' into qpid-3603-5qpid-3603-5
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-5@1243748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 60 |
1 files changed, 35 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 7ac1edd1db..9a1f4be468 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,16 +24,27 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" +#include "qpid/ha/BrokerReplicator.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/framing/Uuid.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include <iostream> using qpid::framing::FieldTable; using qpid::framing::Uuid; using qpid::framing::Buffer; +using qpid::framing::AMQFrame; +using qpid::framing::AMQContentBody; +using qpid::framing::AMQHeaderBody; +using qpid::framing::MessageProperties; +using qpid::framing::MessageTransferBody; +using qpid::types::Variant; using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; @@ -47,9 +58,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) } Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, - const _qmf::ArgsLinkBridge& _args) : + const _qmf::ArgsLinkBridge& _args, + InitializeCallback init) : link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0) + listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0), + initialize(init) { std::stringstream title; title << id << "_" << name; @@ -62,12 +75,12 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); agent->addObject(mgmtObject); } - QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); } -Bridge::~Bridge() +Bridge::~Bridge() { - mgmtObject->resourceDestroy(); + mgmtObject->resourceDestroy(); } void Bridge::create(Connection& c) @@ -86,7 +99,7 @@ void Bridge::create(Connection& c) session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - + session->attach(name, false); session->commandPoint(0,0); } else { @@ -96,11 +109,12 @@ void Bridge::create(Connection& c) } if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); - if (args.i_srcIsQueue) { + if (initialize) initialize(*this, sessionHandler); + else if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); } else { FieldTable queueSettings; @@ -134,9 +148,9 @@ void Bridge::create(Connection& c) if (exchange.get() == 0) throw Exception("Exchange not found for dynamic route"); exchange->registerDynamicBridge(this); - QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src); + QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); } else { - QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); } } if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); @@ -148,15 +162,16 @@ void Bridge::cancel(Connection&) peer->getMessage().cancel(args.i_dest); peer->getSession().detach(name); } + QPID_LOG(debug, "Cancelled bridge " << name); } void Bridge::closed() { if (args.i_dynamic) { - Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); - if (exchange.get() != 0) - exchange->removeDynamicBridge(this); + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src); + if (exchange.get()) exchange->removeDynamicBridge(this); } + QPID_LOG(debug, "Closed bridge " << name); } void Bridge::destroy() @@ -175,11 +190,6 @@ void Bridge::setPersistenceId(uint64_t pId) const persistenceId = pId; } -const string& Bridge::getName() const -{ - return name; -} - Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { string host; @@ -207,7 +217,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) is_queue, is_local, id, excludes, dynamic, sync).first; } -void Bridge::encode(Buffer& buffer) const +void Bridge::encode(Buffer& buffer) const { buffer.putShortString(string("bridge")); buffer.putShortString(link->getHost()); @@ -224,8 +234,8 @@ void Bridge::encode(Buffer& buffer) const buffer.putShort(args.i_sync); } -uint32_t Bridge::encodedSize() const -{ +uint32_t Bridge::encodedSize() const +{ return link->getHost().size() + 1 // short-string (host) + 7 // short-string ("bridge") + 2 // port @@ -250,7 +260,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, management::Args& /*args*/, string&) { - if (methodId == _qmf::Bridge::METHOD_CLOSE) { + if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed destroy(); return management::Manageable::STATUS_OK; @@ -297,7 +307,7 @@ void Bridge::sendReorigin() conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, queueName, args.i_src, args.i_key, bindArgs)); } -bool Bridge::resetProxy() +bool Bridge::resetProxy() { SessionHandler& sessionHandler = conn->getChannel(id); if (!sessionHandler.getSession()) peer.reset(); |