summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-13 23:50:18 +0000
committerAlan Conway <aconway@apache.org>2012-02-13 23:50:18 +0000
commit057a0635e081848ba59964c4a8e0923e521b7fe6 (patch)
treecdad9578140d5dbdb2e90525e2ba9cc870ca50b6 /qpid/cpp/src/qpid/broker/Bridge.cpp
parentcf61dbf9b313f9bd69b392eae8fd8d27d4e609a2 (diff)
downloadqpid-python-057a0635e081848ba59964c4a8e0923e521b7fe6.tar.gz
Merge branch 'qpid-3603-4-rebase' into qpid-3603-5qpid-3603-5
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-5@1243748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp60
1 files changed, 35 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 7ac1edd1db..9a1f4be468 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,16 +24,27 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
+#include "qpid/ha/BrokerReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <iostream>
using qpid::framing::FieldTable;
using qpid::framing::Uuid;
using qpid::framing::Buffer;
+using qpid::framing::AMQFrame;
+using qpid::framing::AMQContentBody;
+using qpid::framing::AMQHeaderBody;
+using qpid::framing::MessageProperties;
+using qpid::framing::MessageTransferBody;
+using qpid::types::Variant;
using qpid::management::ManagementAgent;
using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -47,9 +58,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
}
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args) :
+ const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
+ initialize(init)
{
std::stringstream title;
title << id << "_" << name;
@@ -62,12 +75,12 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
agent->addObject(mgmtObject);
}
- QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
}
-Bridge::~Bridge()
+Bridge::~Bridge()
{
- mgmtObject->resourceDestroy();
+ mgmtObject->resourceDestroy();
}
void Bridge::create(Connection& c)
@@ -86,7 +99,7 @@ void Bridge::create(Connection& c)
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
+
session->attach(name, false);
session->commandPoint(0,0);
} else {
@@ -96,11 +109,12 @@ void Bridge::create(Connection& c)
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
- if (args.i_srcIsQueue) {
+ if (initialize) initialize(*this, sessionHandler);
+ else if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
} else {
FieldTable queueSettings;
@@ -134,9 +148,9 @@ void Bridge::create(Connection& c)
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
exchange->registerDynamicBridge(this);
- QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src);
+ QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
- QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
@@ -148,15 +162,16 @@ void Bridge::cancel(Connection&)
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
}
+ QPID_LOG(debug, "Cancelled bridge " << name);
}
void Bridge::closed()
{
if (args.i_dynamic) {
- Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
- if (exchange.get() != 0)
- exchange->removeDynamicBridge(this);
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src);
+ if (exchange.get()) exchange->removeDynamicBridge(this);
}
+ QPID_LOG(debug, "Closed bridge " << name);
}
void Bridge::destroy()
@@ -175,11 +190,6 @@ void Bridge::setPersistenceId(uint64_t pId) const
persistenceId = pId;
}
-const string& Bridge::getName() const
-{
- return name;
-}
-
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
@@ -207,7 +217,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
is_queue, is_local, id, excludes, dynamic, sync).first;
}
-void Bridge::encode(Buffer& buffer) const
+void Bridge::encode(Buffer& buffer) const
{
buffer.putShortString(string("bridge"));
buffer.putShortString(link->getHost());
@@ -224,8 +234,8 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShort(args.i_sync);
}
-uint32_t Bridge::encodedSize() const
-{
+uint32_t Bridge::encodedSize() const
+{
return link->getHost().size() + 1 // short-string (host)
+ 7 // short-string ("bridge")
+ 2 // port
@@ -250,7 +260,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
management::Args& /*args*/,
string&)
{
- if (methodId == _qmf::Bridge::METHOD_CLOSE) {
+ if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
destroy();
return management::Manageable::STATUS_OK;
@@ -297,7 +307,7 @@ void Bridge::sendReorigin()
conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
queueName, args.i_src, args.i_key, bindArgs));
}
-bool Bridge::resetProxy()
+bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(id);
if (!sessionHandler.getSession()) peer.reset();