diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index fc6ada096f..512e0f03cb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -47,6 +47,7 @@ #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/sys/ClusterSafe.h" #include "qpid/types/Variant.h" @@ -796,6 +797,54 @@ void Connection::config(const std::string& encoded) { else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); } +namespace { + // find a Link that matches the given Address + class LinkFinder { + qpid::Address id; + boost::shared_ptr<broker::Link> link; + public: + LinkFinder(const qpid::Address& _id) : id(_id) {} + boost::shared_ptr<broker::Link> getLink() { return link; } + void operator() (boost::shared_ptr<broker::Link> l) + { + if (!link) { + qpid::Address addr(l->getTransport(), l->getHost(), l->getPort()); + if (id == addr) { + link = l; + } + } + } + }; +} + +void Connection::internalState(const std::string& type, + const std::string& name, + const framing::FieldTable& state) +{ + if (type == "link") { + // name is the string representation of the Link's _configured_ destination address + Url dest; + try { + dest = name; + } catch(...) { + throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name)); + } + assert(dest.size()); + LinkFinder finder(dest[0]); + cluster.getBroker().getLinks().eachLink(boost::ref(finder)); + if (finder.getLink()) { + try { + finder.getLink()->setState(state); + } catch(...) { + throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state)); + } + QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state); + } else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name)); + } + else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type)); +} + + void Connection::doCatchupIoCallbacks() { // We need to process IO callbacks during the catch-up phase in // order to service asynchronous completions for messages |