summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/xml/XmlExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/xml/XmlExchange.cpp')
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp333
1 files changed, 228 insertions, 105 deletions
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index f0afc8d451..85a6cb4f57 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -26,6 +26,7 @@
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/log/Statement.h"
+#include "qpid/broker/FedOps.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -43,19 +44,63 @@
#include <xqilla/context/ItemFactory.hpp>
#include <xqilla/xqilla-simple.hpp>
+#include <boost/bind.hpp>
+#include <functional>
+#include <algorithm>
#include <iostream>
#include <sstream>
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::management::Manageable;
-using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
-namespace broker {
-
+namespace broker {
+
+XQilla XmlBinding::xqilla;
+
+XmlBinding::XmlBinding(const std::string& key, const Queue::shared_ptr queue, const std::string& _fedOrigin, Exchange* parent,
+ const ::qpid::framing::FieldTable& _arguments, const std::string& queryText )
+ : Binding(key, queue, parent, _arguments),
+ xquery(),
+ parse_message_content(true),
+ fedOrigin(_fedOrigin)
+{
+ startManagement();
+
+ QPID_LOG(trace, "Creating binding with query: " << queryText );
+
+ try {
+ Query q(xqilla.parse(X(queryText.c_str())));
+ xquery = q;
+
+ QPID_LOG(trace, "Bound successfully with query: " << queryText );
+
+ parse_message_content = false;
+
+ if (xquery->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) {
+ parse_message_content = true;
+ }
+ else {
+ GlobalVariables &vars = const_cast<GlobalVariables&>(xquery->getVariables());
+ for (GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) {
+ if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
+ parse_message_content = true;
+ break;
+ }
+ }
+ }
+ }
+ catch (XQException& e) {
+ throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
+ }
+ catch (...) {
+ throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText));
+ }
+}
+
XmlExchange::XmlExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
{
if (mgmtExchange != 0)
@@ -69,69 +114,83 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable,
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
}
+
+bool XmlExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
+{
+
+ // Federation uses bind for unbind and reorigin comands as well as for binds.
+ //
+ // Both federated and local binds are done in this method. Other
+ // federated requests are done by calling the relevent methods.
+
+ string fedOp;
+ string fedTags;
+ string fedOrigin;
+
+ if (args)
+ fedOp = args->getAsString(qpidFedOp);
+ if (! fedOp.empty()) {
+ fedTags = args->getAsString(qpidFedTags);
+ fedOrigin = args->getAsString(qpidFedOrigin);
+ }
+ if (fedOp == fedOpUnbind) {
+ return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args);
+ }
+ else if (fedOp == fedOpReorigin) {
+ fedReorigin();
+ return true;
+ }
- // #### TODO: The Binding should take the query text
- // #### only. Consider encapsulating the entire block, including
- // #### the if condition.
-
-
-bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments)
-{
- string queryText = bindingArguments->getAsString("xquery");
-
- try {
- RWlock::ScopedWlock l(lock);
-
- XmlBinding::vector& bindings(bindingsMap[routingKey]);
- XmlBinding::vector::ConstPtr p = bindings.snapshot();
- if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) {
- Query query(xqilla.parse(X(queryText.c_str())));
- XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, *bindingArguments, query));
+ // OK, looks like we're really going to bind
+
+ else if (fedOp.empty() || fedOp == fedOpBind) {
- QPID_LOG(trace, "Bound successfully with query: " << queryText );
+ string queryText = args->getAsString("xquery");
- binding->parse_message_content = false;
+ RWlock::ScopedWlock l(lock);
+
+ XmlBinding::vector& bindings(bindingsMap[bindingKey]);
+ XmlBinding::vector::ConstPtr p = bindings.snapshot();
+
+ if (!p || std::find_if(p->begin(), p->end(), MatchQueueAndOrigin(queue, fedOrigin)) == p->end()) {
- if (query->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) {
- binding->parse_message_content = true;
- }
- else {
- GlobalVariables &vars = const_cast<GlobalVariables&>(query->getVariables());
- for(GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) {
- if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
- binding->parse_message_content = true;
- break;
- }
- }
- }
+ XmlBinding::shared_ptr binding(new XmlBinding (bindingKey, queue, fedOrigin, this, *args, queryText));
+ bindings.add(binding);
- bindings.add(binding);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
- }
- } else {
- return false;
- }
- }
- catch (XQException& e) {
- throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ }
+ } else {
+ return false;
+ }
}
- catch (...) {
- throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText));
+ else {
+ QPID_LOG(warning, "Unknown Federation Op: " << fedOp);
}
+
routeIVE();
- return true;
+ propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, args);
+
+ return true;
}
-bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+bool XmlExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
{
+ /*
+ * When called directly, no qpidFedOrigin argument will be
+ * present. When called from federation, it will be present.
+ *
+ * This is a bit of a hack - the binding needs the origin, but
+ * this interface, as originally defined, would not supply one.
+ */
+ string fedOrigin;
+ if (args) fedOrigin = args->getAsString(qpidFedOrigin);
+
RWlock::ScopedWlock l(lock);
- if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) {
+ if (bindingsMap[bindingKey].remove_if(MatchQueueAndOrigin(queue, fedOrigin))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
@@ -141,65 +200,65 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons
bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content)
{
- string msgContent;
+ string msgContent;
- try {
- QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
+ try {
+ QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
- boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
- if (!context.get()) {
- throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
- }
+ boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
+ if (!context.get()) {
+ throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
+ }
- if (parse_message_content) {
+ if (parse_message_content) {
- msg.getMessage().getFrames().getContent(msgContent);
+ msg.getMessage().getFrames().getContent(msgContent);
- QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
+ QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
- XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
- msgContent.length(), "input" );
+ XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
+ msgContent.length(), "input" );
- // This will parse the document using either Xerces or FastXDM, depending
- // on your XQilla configuration. FastXDM can be as much as 10x faster.
+ // This will parse the document using either Xerces or FastXDM, depending
+ // on your XQilla configuration. FastXDM can be as much as 10x faster.
- Sequence seq(context->parseDocument(xml));
+ Sequence seq(context->parseDocument(xml));
- if(!seq.isEmpty() && seq.first()->isNode()) {
- context->setContextItem(seq.first());
- context->setContextPosition(1);
- context->setContextSize(1);
- }
- }
+ if(!seq.isEmpty() && seq.first()->isNode()) {
+ context->setContextItem(seq.first());
+ context->setContextPosition(1);
+ context->setContextSize(1);
+ }
+ }
- if (args) {
- FieldTable::ValueMap::const_iterator v = args->begin();
- for(; v != args->end(); ++v) {
- // ### TODO: Do types properly
- if (v->second->convertsTo<std::string>()) {
- QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
- Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
- context->setExternalVariable(X(v->first.c_str()), value);
- }
- }
- }
+ if (args) {
+ FieldTable::ValueMap::const_iterator v = args->begin();
+ for(; v != args->end(); ++v) {
+ // ### TODO: Do types properly
+ if (v->second->convertsTo<std::string>()) {
+ QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
+ Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
+ context->setExternalVariable(X(v->first.c_str()), value);
+ }
+ }
+ }
- Result result = query->execute(context.get());
+ Result result = query->execute(context.get());
#ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
- Item::Ptr first_ = result->next(context.get());
- Item::Ptr second_ = result->next(context.get());
- return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0);
+ Item::Ptr first_ = result->next(context.get());
+ Item::Ptr second_ = result->next(context.get());
+ return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0);
#else
- return result->getEffectiveBooleanValue(context.get(), 0);
+ return result->getEffectiveBooleanValue(context.get(), 0);
#endif
- }
- catch (XQException& e) {
- QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
- }
- catch (...) {
- QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
- }
- return 0;
+ }
+ catch (XQException& e) {
+ QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
+ }
+ catch (...) {
+ QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
+ }
+ return 0;
}
// Future optimization: If any query in a binding for a given routing key requires
@@ -237,16 +296,16 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT
}
-bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const bindingKey, const FieldTable* const)
{
RWlock::ScopedRlock l(lock);
- if (routingKey) {
- XmlBindingsMap::iterator i = bindingsMap.find(*routingKey);
+ if (bindingKey) {
+ XmlBindingsMap::iterator i = bindingsMap.find(*bindingKey);
if (i == bindingsMap.end())
- return false;
+ return false;
if (!queue)
- return true;
+ return true;
XmlBinding::vector::ConstPtr p = i->second.snapshot();
return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
} else if (!queue) {
@@ -254,20 +313,84 @@ bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKe
return bindingsMap.size() > 0;
} else {
for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) {
- XmlBinding::vector::ConstPtr p = i->second.snapshot();
+ XmlBinding::vector::ConstPtr p = i->second.snapshot();
if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
- }
- return false;
+ }
+ return false;
}
}
-
XmlExchange::~XmlExchange()
{
bindingsMap.clear();
}
+void XmlExchange::propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args)
+{
+ FieldTable nonFedArgs;
+
+ if (args) {
+ for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i) {
+ const string& name(i->first);
+ if (name != qpidFedOp &&
+ name != qpidFedTags &&
+ name != qpidFedOrigin) {
+ nonFedArgs.insert((*i));
+ }
+ }
+ }
+
+ FieldTable* propArgs = (nonFedArgs.count() > 0 ? &nonFedArgs : 0);
+ Exchange::propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, propArgs);
+}
+
+bool XmlExchange::fedUnbind(const string& fedOrigin, const string& fedTags, Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
+{
+ RWlock::ScopedRlock l(lock);
+
+ if (unbind(queue, bindingKey, args)) {
+ propagateFedOp(bindingKey, fedTags, fedOpUnbind, fedOrigin);
+ return true;
+ }
+ return false;
+}
+
+void XmlExchange::fedReorigin()
+{
+ std::vector<std::string> keys2prop;
+ {
+ RWlock::ScopedRlock l(lock);
+ for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); ++i) {
+ XmlBinding::vector::ConstPtr p = i->second.snapshot();
+ if (std::find_if(p->begin(), p->end(), MatchOrigin(string())) != p->end()) {
+ keys2prop.push_back(i->first);
+ }
+ }
+ } /* lock dropped */
+ for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+ key != keys2prop.end(); key++) {
+ propagateFedOp( *key, string(), fedOpBind, string());
+ }
+}
+
+
+XmlExchange::MatchOrigin::MatchOrigin(const string& _origin) : origin(_origin) {}
+
+bool XmlExchange::MatchOrigin::operator()(XmlBinding::shared_ptr b)
+{
+ return b->fedOrigin == origin;
+}
+
+
+XmlExchange::MatchQueueAndOrigin::MatchQueueAndOrigin(Queue::shared_ptr _queue, const string& _origin) : queue(_queue), origin(_origin) {}
+
+bool XmlExchange::MatchQueueAndOrigin::operator()(XmlBinding::shared_ptr b)
+{
+ return b->queue == queue and b->fedOrigin == origin;
+}
+
+
const std::string XmlExchange::typeName("xml");
}