diff options
author | Gordon Sim <gsim@apache.org> | 2008-05-13 21:38:28 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-05-13 21:38:28 +0000 |
commit | 7e6635d7e297f6ecf2199028da573bc10437f3ae (patch) | |
tree | 7e6ffe5b0d89cd1d496578a6f13a8184849abd33 /cpp/src/qpid/broker/Bridge.cpp | |
parent | c36a80f0b03bea42bca570e994d08613b08fd6dd (diff) | |
download | qpid-python-7e6635d7e297f6ecf2199028da573bc10437f3ae.tar.gz |
QPID-990: Patch from Ted Ross to enable persisting of inter-broker routing entities
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@656023 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 101 |
1 files changed, 91 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index a8e7b3c368..337992992f 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -20,29 +20,35 @@ */ #include "Bridge.h" #include "ConnectionState.h" +#include "LinkRegistry.h" #include "qpid/management/ManagementAgent.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" using qpid::framing::FieldTable; using qpid::framing::Uuid; +using qpid::framing::Buffer; +using qpid::management::ManagementAgent; namespace qpid { namespace broker { -Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l, +Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : - id(_id), args(_args), - mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest, - args.i_key, args.i_src_is_queue, args.i_src_is_local)), - listener(l), name(Uuid(true).str()) + link(_link), id(_id), args(_args), + mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest, + args.i_key, args.i_src_is_queue, args.i_src_is_local, + args.i_tag, args.i_excludes)), + listener(l), name(Uuid(true).str()), persistenceId(0) { - management::ManagementAgent::getAgent()->addObject(mgmtObject); + if (!args.i_durable) + management::ManagementAgent::getAgent()->addObject(mgmtObject); } Bridge::~Bridge() -{ +{ mgmtObject->resourceDestroy(); } @@ -65,8 +71,8 @@ void Bridge::create(ConnectionState& c) string queue = "bridge_queue_"; queue += Uuid(true).str(); FieldTable queueSettings; - if (args.i_id.size()) { - queueSettings.setString("qpid.trace.id", args.i_id); + if (args.i_tag.size()) { + queueSettings.setString("qpid.trace.id", args.i_tag); } if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); @@ -89,6 +95,81 @@ void Bridge::cancel() peer->getSession().detach(name); } +void Bridge::destroy() +{ + listener(this); +} + +void Bridge::setPersistenceId(uint64_t id) const +{ + if (mgmtObject != 0 && persistenceId == 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject, id); + } + persistenceId = id; +} + +const string& Bridge::getName() const +{ + return name; +} + +Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) +{ + string host; + uint16_t port; + string src; + string dest; + string key; + string id; + string excludes; + + buffer.getShortString(host); + port = buffer.getShort(); + bool durable(buffer.getOctet()); + buffer.getShortString(src); + buffer.getShortString(dest); + buffer.getShortString(key); + bool is_queue(buffer.getOctet()); + bool is_local(buffer.getOctet()); + buffer.getShortString(id); + buffer.getShortString(excludes); + + return links.declare(host, port, durable, src, dest, key, + is_queue, is_local, id, excludes).first; +} + +void Bridge::encode(Buffer& buffer) const +{ + buffer.putShortString(string("bridge")); + buffer.putShortString(link->getHost()); + buffer.putShort(link->getPort()); + buffer.putOctet(args.i_durable ? 1 : 0); + buffer.putShortString(args.i_src); + buffer.putShortString(args.i_dest); + buffer.putShortString(args.i_key); + buffer.putOctet(args.i_src_is_queue ? 1 : 0); + buffer.putOctet(args.i_src_is_local ? 1 : 0); + buffer.putShortString(args.i_tag); + buffer.putShortString(args.i_excludes); +} + +uint32_t Bridge::encodedSize() const +{ + return link->getHost().size() + 1 // short-string (host) + + 7 // short-string ("bridge") + + 2 // port + + 1 // durable + + args.i_src.size() + 1 + + args.i_dest.size() + 1 + + args.i_key.size() + 1 + + 1 // src_is_queue + + 1 // src_is_local + + args.i_tag.size() + 1 + + args.i_excludes.size() + 1; +} + management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const { return dynamic_pointer_cast<management::ManagementObject>(mgmtObject); @@ -98,7 +179,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man { if (methodId == management::Bridge::METHOD_CLOSE) { //notify that we are closed - listener(this); + destroy(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; |