summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/HeadersExchange.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp130
1 files changed, 99 insertions, 31 deletions
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 9975d26c72..02c05852ff 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/HeadersExchange.h"
+#include "qpid/broker/MapHandler.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -55,6 +56,100 @@ namespace {
const std::string fedOpUnbind("U");
const std::string fedOpReorigin("R");
const std::string fedOpHello("H");
+
+std::string getMatch(const FieldTable* args)
+{
+ if (!args) {
+ throw InternalErrorException(QPID_MSG("No arguments given."));
+ }
+ FieldTable::ValuePtr what = args->get(x_match);
+ if (!what) {
+ return empty;
+ }
+ if (!what->convertsTo<std::string>()) {
+ throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]"));
+ }
+ return what->get<std::string>();
+}
+class Matcher : public MapHandler
+{
+ public:
+ Matcher(const FieldTable& b) : binding(b), matched(0) {}
+ void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleInt8(const MapHandler::CharSequence& key, int8_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt16(const MapHandler::CharSequence& key, int16_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt32(const MapHandler::CharSequence& key, int32_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt64(const MapHandler::CharSequence& key, int64_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleFloat(const MapHandler::CharSequence& key, float value) { processFloat(std::string(key.data, key.size), value); }
+ void handleDouble(const MapHandler::CharSequence& key, double value) { processFloat(std::string(key.data, key.size), value); }
+ void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/)
+ {
+ processString(std::string(key.data, key.size), std::string(value.data, value.size));
+ }
+ void handleVoid(const MapHandler::CharSequence& key)
+ {
+ valueCheckRequired(std::string(key.data, key.size));
+ }
+ bool matches()
+ {
+ std::string what = getMatch(&binding);
+ if (what == all) {
+ //must match all entries in the binding, except the match mode indicator
+ return matched == binding.size() - 1;
+ } else if (what == any) {
+ //match any of the entries in the binding
+ return matched > 0;
+ } else {
+ return false;
+ }
+ }
+ private:
+ bool valueCheckRequired(const std::string& key)
+ {
+ FieldTable::ValuePtr v = binding.get(key);
+ if (v) {
+ if (v->getType() == 0xf0/*VOID*/) {
+ ++matched;
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ void processString(const std::string& key, const std::string& actual)
+ {
+ if (valueCheckRequired(key) && binding.getAsString(key) == actual) {
+ ++matched;
+ }
+ }
+ void processFloat(const std::string& key, double actual)
+ {
+ double bound;
+ if (valueCheckRequired(key) && binding.getDouble(key, bound) && bound == actual) {
+ ++matched;
+ }
+ }
+ void processInt(const std::string& key, int64_t actual)
+ {
+ if (valueCheckRequired(key) && binding.getAsInt64(key) == actual) {
+ ++matched;
+ }
+ }
+ void processUint(const std::string& key, uint64_t actual)
+ {
+ if (valueCheckRequired(key) && binding.getAsUInt64(key) == actual) {
+ ++matched;
+ }
+ }
+ const FieldTable& binding;
+ size_t matched;
+};
}
HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) :
@@ -72,21 +167,6 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-std::string HeadersExchange::getMatch(const FieldTable* args)
-{
- if (!args) {
- throw InternalErrorException(QPID_MSG("No arguments given."));
- }
- FieldTable::ValuePtr what = args->get(x_match);
- if (!what) {
- return empty;
- }
- if (!what->convertsTo<std::string>()) {
- throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]"));
- }
- return what->get<std::string>();
-}
-
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
{
string fedOp(fedOpBind);
@@ -196,28 +276,16 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
void HeadersExchange::route(Deliverable& msg)
{
- const FieldTable* args = msg.getMessage().getApplicationHeaders();
- if (!args) {
- //can't match if there were no headers passed in
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsNoRoute();
- }
- return;
- }
-
PreRoute pr(msg, this);
BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()) {
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
- if (match((*i).binding->args, *args)) {
- b->push_back((*i).binding);
+ Matcher matcher(i->binding->args);
+ msg.getMessage().processProperties(matcher);
+ if (matcher.matches()) {
+ b->push_back(i->binding);
}
}
}