summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp')
-rw-r--r--trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp153
1 files changed, 153 insertions, 0 deletions
diff --git a/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
new file mode 100644
index 0000000000..4aa68bee9c
--- /dev/null
+++ b/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/log/Statement.h"
+#include "DirectExchange.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using qpid::management::Manageable;
+
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
+ RWlock::ScopedWlock l(lock);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ if (i == queues.end()) {
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingKey].push_back(binding);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((management::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ }
+ return true;
+ } else{
+ return false;
+ }
+}
+
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+ RWlock::ScopedWlock l(lock);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ if (i < queues.end()) {
+ queues.erase(i);
+ if (queues.empty()) {
+ bindings.erase(routingKey);
+ }
+ if (mgmtExchange != 0) {
+ mgmtExchange->dec_bindingCount();
+ ((management::Queue*) queue->GetManagementObject())->dec_bindingCount();
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+ RWlock::ScopedRlock l(lock);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+ int count(0);
+
+ for(i = queues.begin(); i != queues.end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
+ if(!count){
+ QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ }
+ else {
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ }
+}
+
+
+bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+ std::vector<Binding::shared_ptr>::iterator j;
+
+ if (routingKey) {
+ Bindings::iterator i = bindings.find(*routingKey);
+
+ if (i == bindings.end())
+ return false;
+ if (!queue)
+ return true;
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
+ } else if (!queue) {
+ //if no queue or routing key is specified, just report whether any bindings exist
+ return bindings.size() > 0;
+ } else {
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
+ return false;
+ }
+
+ return false;
+}
+
+DirectExchange::~DirectExchange() {}
+
+const std::string DirectExchange::typeName("direct");