diff options
Diffstat (limited to 'cpp/broker/src')
-rw-r--r-- | cpp/broker/src/AutoDelete.cpp | 93 | ||||
-rw-r--r-- | cpp/broker/src/Broker.cpp | 84 | ||||
-rw-r--r-- | cpp/broker/src/Channel.cpp | 256 | ||||
-rw-r--r-- | cpp/broker/src/Configuration.cpp | 196 | ||||
-rw-r--r-- | cpp/broker/src/DirectExchange.cpp | 72 | ||||
-rw-r--r-- | cpp/broker/src/ExchangeBinding.cpp | 32 | ||||
-rw-r--r-- | cpp/broker/src/ExchangeRegistry.cpp | 57 | ||||
-rw-r--r-- | cpp/broker/src/FanOutExchange.cpp | 56 | ||||
-rw-r--r-- | cpp/broker/src/HeadersExchange.cpp | 120 | ||||
-rw-r--r-- | cpp/broker/src/Message.cpp | 100 | ||||
-rw-r--r-- | cpp/broker/src/NameGenerator.cpp | 29 | ||||
-rw-r--r-- | cpp/broker/src/Queue.cpp | 155 | ||||
-rw-r--r-- | cpp/broker/src/QueueRegistry.cpp | 72 | ||||
-rw-r--r-- | cpp/broker/src/Router.cpp | 32 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerFactoryImpl.cpp | 50 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 405 | ||||
-rw-r--r-- | cpp/broker/src/TopicExchange.cpp | 163 |
17 files changed, 0 insertions, 1972 deletions
diff --git a/cpp/broker/src/AutoDelete.cpp b/cpp/broker/src/AutoDelete.cpp deleted file mode 100644 index 6793ec449d..0000000000 --- a/cpp/broker/src/AutoDelete.cpp +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "AutoDelete.h" - -using namespace qpid::broker; - -AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry), - period(_period), - stopped(true), - runner(0){} - -void AutoDelete::add(Queue::shared_ptr const queue){ - lock.acquire(); - queues.push(queue); - lock.release(); -} - -Queue::shared_ptr const AutoDelete::pop(){ - Queue::shared_ptr next; - lock.acquire(); - if(!queues.empty()){ - next = queues.front(); - queues.pop(); - } - lock.release(); - return next; -} - -void AutoDelete::process(){ - Queue::shared_ptr seen; - for(Queue::shared_ptr q = pop(); q; q = pop()){ - if(seen == q){ - add(q); - break; - }else if(q->canAutoDelete()){ - std::string name(q->getName()); - registry->destroy(name); - std::cout << "INFO: Auto-deleted queue named " << name << std::endl; - }else{ - add(q); - if(!seen) seen = q; - } - } -} - -void AutoDelete::run(){ - monitor.acquire(); - while(!stopped){ - process(); - monitor.wait(period); - } - monitor.release(); -} - -void AutoDelete::start(){ - monitor.acquire(); - if(stopped){ - runner = factory.create(this); - stopped = false; - monitor.release(); - runner->start(); - }else{ - monitor.release(); - } -} - -void AutoDelete::stop(){ - monitor.acquire(); - if(!stopped){ - stopped = true; - monitor.notify(); - monitor.release(); - runner->join(); - delete runner; - }else{ - monitor.release(); - } -} diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp deleted file mode 100644 index b6472d1729..0000000000 --- a/cpp/broker/src/Broker.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include <memory> -#include "Broker.h" -#include "Acceptor.h" -#include "Configuration.h" -#include "QpidError.h" -#include "SessionHandlerFactoryImpl.h" -#include "BlockingAPRAcceptor.h" -#include "LFAcceptor.h" - - -using namespace qpid::broker; -using namespace qpid::io; - -namespace { - Acceptor* createAcceptor(const Configuration& config){ - const string type(config.getAcceptor()); - if("blocking" == type){ - std::cout << "Using blocking acceptor " << std::endl; - return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); - }else if("non-blocking" == type){ - std::cout << "Using non-blocking acceptor " << std::endl; - return new LFAcceptor(config.isTrace(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.getMaxConnections()); - } - throw Configuration::ParseException("Unrecognised acceptor: " + type); - } -} - -Broker::Broker(const Configuration& config) : - acceptor(createAcceptor(config)), - port(config.getPort()), - isBound(false) {} - -Broker::shared_ptr Broker::create(int port) -{ - Configuration config; - config.setPort(port); - return create(config); -} - -Broker::shared_ptr Broker::create(const Configuration& config) { - return Broker::shared_ptr(new Broker(config)); -} - -int16_t Broker::bind() -{ - if (!isBound) { - port = acceptor->bind(port); - } - return port; -} - -void Broker::run() { - bind(); - acceptor->run(&factory); -} - -void Broker::shutdown() { - acceptor->shutdown(); -} - -Broker::~Broker() { } - -const int16_t Broker::DEFAULT_PORT(5672); diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp deleted file mode 100644 index 34d69716c4..0000000000 --- a/cpp/broker/src/Channel.cpp +++ /dev/null @@ -1,256 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Channel.h" -#include "QpidError.h" -#include <iostream> -#include <sstream> -#include <assert.h> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - - -Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : - id(_id), - out(_out), - deliveryTag(1), - transactional(false), - prefetchSize(0), - prefetchCount(0), - outstandingSize(0), - outstandingCount(0), - framesize(_framesize), - tagGenerator("sgen"){} - -Channel::~Channel(){ -} - -bool Channel::exists(const string& consumerTag){ - return consumers.find(consumerTag) != consumers.end(); -} - -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){ - if(tag.empty()) tag = tagGenerator.generate(); - - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); - try{ - queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; - }catch(ExclusiveAccessException& e){ - delete c; - throw e; - } -} - -void Channel::cancel(consumer_iterator i){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } -} - -void Channel::cancel(const string& tag){ - consumer_iterator i = consumers.find(tag); - if(i != consumers.end()){ - cancel(i); - } -} - -void Channel::close(){ - //cancel all consumers - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - cancel(i); - } -} - -void Channel::begin(){ - transactional = true; -} - -void Channel::commit(){ - -} - -void Channel::rollback(){ - -} - -void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ - Locker locker(deliveryLock); - - u_int64_t myDeliveryTag = deliveryTag++; - if(ackExpected){ - unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); - outstandingSize += msg->contentSize(); - outstandingCount++; - } - //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); -} - -bool Channel::checkPrefetch(Message::shared_ptr& msg){ - Locker locker(deliveryLock); - bool countOk = !prefetchCount || prefetchCount > unacknowledged.size(); - bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty(); - return countOk && sizeOk; -} - -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection), - ackExpected(ack), - blocked(false){ -} - -bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ - if(!connection || connection != msg->getPublisher()){//check for no_local - if(ackExpected && !parent->checkPrefetch(msg)){ - blocked = true; - }else{ - blocked = false; - parent->deliver(msg, tag, queue, ackExpected); - return true; - } - } - return false; -} - -void Channel::ConsumerImpl::cancel(){ - if(queue) queue->cancel(this); -} - -void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) queue->dispatch(); -} - -void Channel::checkMessage(const std::string& text){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text); - } -} - -void Channel::handlePublish(Message* msg){ - if(message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); - } - message = Message::shared_ptr(msg); -} - -void Channel::ack(u_int64_t _deliveryTag, bool multiple){ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag)); - if(i == unacknowledged.end()){ - throw InvalidAckException(); - }else if(multiple){ - unacknowledged.erase(unacknowledged.begin(), ++i); - //recompute prefetch outstanding (note: messages delivered through get are ignored) - CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch())); - outstandingSize = calc.getSize(); - outstandingCount = calc.getCount(); - }else{ - if(!i->pull){ - outstandingSize -= i->msg->contentSize(); - outstandingCount--; - } - unacknowledged.erase(i); - } - - //if the prefetch limit had previously been reached, there may - //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); - } -} - -void Channel::recover(bool requeue){ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery - - if(requeue){ - outstandingSize = 0; - outstandingCount = 0; - ack_iterator start(unacknowledged.begin()); - ack_iterator end(unacknowledged.end()); - for_each(start, end, Requeue()); - unacknowledged.erase(start, end); - }else{ - for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this)); - } -} - -bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ - Message::shared_ptr msg = queue->dequeue(); - if(msg){ - Locker locker(deliveryLock); - u_int64_t myDeliveryTag = deliveryTag++; - msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); - if(ackExpected){ - unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag)); - } - return true; - }else{ - return false; - } -} - -Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} - -bool Channel::MatchAck::operator()(AckRecord& record) const{ - return tag == record.deliveryTag; -} - -void Channel::Requeue::operator()(AckRecord& record) const{ - record.msg->redeliver(); - record.queue->deliver(record.msg); -} - -Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} - -void Channel::Redeliver::operator()(AckRecord& record) const{ - if(record.pull){ - //if message was originally sent as response to get, we must requeue it - record.msg->redeliver(); - record.queue->deliver(record.msg); - }else{ - record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); - } -} - -Channel::CalculatePrefetch::CalculatePrefetch() : size(0){} - -void Channel::CalculatePrefetch::operator()(AckRecord& record){ - if(!record.pull){ - //ignore messages that were sent in response to get when calculating prefetch - size += record.msg->contentSize(); - count++; - } -} - -u_int32_t Channel::CalculatePrefetch::getSize(){ - return size; -} - -u_int16_t Channel::CalculatePrefetch::getCount(){ - return count; -} diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp deleted file mode 100644 index 6e7df7889e..0000000000 --- a/cpp/broker/src/Configuration.cpp +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Configuration.h" -#include <string.h> - -using namespace qpid::broker; -using namespace std; - -Configuration::Configuration() : - trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false), - port('p', "port", "Sets the port to listen on (default=5672)", 5672), - workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5), - maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500), - connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), - acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"), - help("help", "Prints usage information", false) -{ - options.push_back(&trace); - options.push_back(&port); - options.push_back(&workerThreads); - options.push_back(&maxConnections); - options.push_back(&connectionBacklog); - options.push_back(&acceptor); - options.push_back(&help); -} - -Configuration::~Configuration(){} - -void Configuration::parse(int argc, char** argv){ - int position = 1; - while(position < argc){ - bool matched(false); - for(op_iterator i = options.begin(); i < options.end() && !matched; i++){ - matched = (*i)->parse(position, argv, argc); - } - if(!matched){ - std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl; - position++; - } - } -} - -void Configuration::usage(){ - for(op_iterator i = options.begin(); i < options.end(); i++){ - (*i)->print(std::cout); - } -} - -bool Configuration::isHelp() const { - return help.getValue(); -} - -bool Configuration::isTrace() const { - return trace.getValue(); -} - -int Configuration::getPort() const { - return port.getValue(); -} - -int Configuration::getWorkerThreads() const { - return workerThreads.getValue(); -} - -int Configuration::getMaxConnections() const { - return maxConnections.getValue(); -} - -int Configuration::getConnectionBacklog() const { - return connectionBacklog.getValue(); -} - -string Configuration::getAcceptor() const { - return acceptor.getValue(); -} - -Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : - flag(string("-") + _flag), name("--" +_name), desc(_desc) {} - -Configuration::Option::Option(const string& _name, const string& _desc) : - flag(""), name("--" + _name), desc(_desc) {} - -Configuration::Option::~Option(){} - -bool Configuration::Option::match(const string& arg){ - return flag == arg || name == arg; -} - -bool Configuration::Option::parse(int& i, char** argv, int argc){ - const string arg(argv[i]); - if(match(arg)){ - if(needsValue()){ - if(++i < argc) setValue(argv[i]); - else throw ParseException("Argument " + arg + " requires a value!"); - }else{ - setValue(""); - } - i++; - return true; - }else{ - return false; - } -} - -void Configuration::Option::print(ostream& out) const { - out << " "; - if(flag.length() > 0){ - out << flag << " or "; - } - out << name; - if(needsValue()) out << "<value>"; - out << std::endl; - out << " " << desc << std::endl; -} - - -// String Option: - -Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::StringOption::~StringOption(){} - -const string& Configuration::StringOption::getValue() const { - return value; -} - -bool Configuration::StringOption::needsValue() const { - return true; -} - -void Configuration::StringOption::setValue(const std::string& _value){ - value = _value; -} - -// Int Option: - -Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::IntOption::~IntOption(){} - -int Configuration::IntOption::getValue() const { - return value; -} - -bool Configuration::IntOption::needsValue() const { - return true; -} - -void Configuration::IntOption::setValue(const std::string& _value){ - value = atoi(_value.c_str()); -} - -// Bool Option: - -Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::BoolOption::~BoolOption(){} - -bool Configuration::BoolOption::getValue() const { - return value; -} - -bool Configuration::BoolOption::needsValue() const { - return false; -} - -void Configuration::BoolOption::setValue(const std::string& _value){ - value = strcasecmp(_value.c_str(), "true") == 0; -} diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp deleted file mode 100644 index 94cfbc766d..0000000000 --- a/cpp/broker/src/DirectExchange.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "DirectExchange.h" -#include "ExchangeBinding.h" -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::framing; - -DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { - -} - -void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i == queues.end()){ - bindings[routingKey].push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - } - lock.release(); -} - -void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i < queues.end()){ - queues.erase(i); - if(queues.empty()){ - bindings.erase(routingKey); - } - } - lock.release(); -} - -void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - int count(0); - for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ - (*i)->deliver(msg); - } - if(!count){ - std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; - } - lock.release(); -} - -DirectExchange::~DirectExchange(){ - -} - - -const std::string DirectExchange::typeName("direct"); diff --git a/cpp/broker/src/ExchangeBinding.cpp b/cpp/broker/src/ExchangeBinding.cpp deleted file mode 100644 index 6160a67fd3..0000000000 --- a/cpp/broker/src/ExchangeBinding.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "ExchangeBinding.h" -#include "Exchange.h" - -using namespace qpid::broker; -using namespace qpid::framing; - -ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){} - -void ExchangeBinding::cancel(){ - e->unbind(q, key, args); - delete this; -} - -ExchangeBinding::~ExchangeBinding(){ -} diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp deleted file mode 100644 index 05396382a7..0000000000 --- a/cpp/broker/src/ExchangeRegistry.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "ExchangeRegistry.h" -#include "MonitorImpl.h" - -using namespace qpid::broker; -using namespace qpid::concurrent; - -ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){} - -ExchangeRegistry::~ExchangeRegistry(){ - for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i) - { - delete i->second; - } - delete lock; -} - -void ExchangeRegistry::declare(Exchange* exchange){ - exchanges[exchange->getName()] = exchange; -} - -void ExchangeRegistry::destroy(const string& name){ - if(exchanges[name]){ - delete exchanges[name]; - exchanges.erase(name); - } -} - -Exchange* ExchangeRegistry::get(const string& name){ - return exchanges[name]; -} - -namespace -{ -const std::string empty; -} - -Exchange* ExchangeRegistry::getDefault() -{ - return get(empty); -} diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp deleted file mode 100644 index e8cb8f6315..0000000000 --- a/cpp/broker/src/FanOutExchange.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "FanOutExchange.h" -#include "ExchangeBinding.h" -#include <algorithm> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - -FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} - -void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - Locker locker(lock); - // Add if not already present. - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); - if (i == bindings.end()) { - bindings.push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - } -} - -void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){ - Locker locker(lock); - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); - if (i != bindings.end()) { - bindings.erase(i); - // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match? - } -} - -void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){ - Locker locker(lock); - for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ - (*i)->deliver(msg); - } -} - -FanOutExchange::~FanOutExchange() {} - -const std::string FanOutExchange::typeName("fanout"); diff --git a/cpp/broker/src/HeadersExchange.cpp b/cpp/broker/src/HeadersExchange.cpp deleted file mode 100644 index 65204cdb85..0000000000 --- a/cpp/broker/src/HeadersExchange.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "HeadersExchange.h" -#include "ExchangeBinding.h" -#include "Value.h" -#include "QpidError.h" -#include <algorithm> - - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// The current search algorithm really sucks. -// Fieldtables are heavy, maybe use shared_ptr to do handle-body. - -using namespace qpid::broker; - -namespace { - const std::string all("all"); - const std::string any("any"); - const std::string x_match("x-match"); -} - -HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } - -void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - std::cout << "HeadersExchange::bind" << std::endl; - Locker locker(lock); - std::string what = args->getString("x-match"); - if (what != all && what != any) { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); - } - bindings.push_back(Binding(*args, queue)); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); -} - -void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){ - Locker locker(lock); - Bindings::iterator i = - std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); - if (i != bindings.end()) bindings.erase(i); -} - - -void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){ - std::cout << "route: " << *args << std::endl; - Locker locker(lock);; - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *args)) i->second->deliver(msg); - } -} - -HeadersExchange::~HeadersExchange() {} - -const std::string HeadersExchange::typeName("headers"); - -namespace -{ - - bool match_values(const Value& bind, const Value& msg) { - return dynamic_cast<const EmptyValue*>(&bind) || bind == msg; - } - -} - - -bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { - typedef FieldTable::ValueMap Map; - std::string what = bind.getString(x_match); - if (what == all) { - for (Map::const_iterator i = bind.getMap().begin(); - i != bind.getMap().end(); - ++i) - { - if (i->first != x_match) - { - Map::const_iterator j = msg.getMap().find(i->first); - if (j == msg.getMap().end()) return false; - if (!match_values(*(i->second), *(j->second))) return false; - } - } - return true; - } else if (what == any) { - for (Map::const_iterator i = bind.getMap().begin(); - i != bind.getMap().end(); - ++i) - { - if (i->first != x_match) - { - Map::const_iterator j = msg.getMap().find(i->first); - if (j != msg.getMap().end()) { - if (match_values(*(i->second), *(j->second))) return true; - } - } - } - return false; - } else { - return false; - } -} - - - diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp deleted file mode 100644 index 0a8a5f7a4d..0000000000 --- a/cpp/broker/src/Message.cpp +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "MonitorImpl.h" -#include "Message.h" -#include "ExchangeRegistry.h" -#include <iostream> - -using namespace std::tr1;//for *_pointer_cast methods -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - - -Message::Message(const ConnectionToken* const _publisher, - const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate) : publisher(_publisher), - exchange(_exchange), - routingKey(_routingKey), - mandatory(_mandatory), - immediate(_immediate), - redelivered(false), - size(0){ - -} - -Message::~Message(){ -} - -void Message::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; -} - -void Message::addContent(AMQContentBody::shared_ptr data){ - content.push_back(data); - size += data->size(); -} - -bool Message::isComplete(){ - return header.get() && (header->getContentSize() == contentSize()); -} - -void Message::redeliver(){ - redelivered = true; -} - -void Message::deliver(OutputHandler* out, int channel, - const string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize){ - - out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); - sendContent(out, channel, framesize); -} - -void Message::sendGetOk(OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize){ - - out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); - sendContent(out, channel, framesize); -} - -void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ - AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); - out->send(new AMQFrame(channel, headerBody)); - for(content_iterator i = content.begin(); i != content.end(); i++){ - if((*i)->size() > framesize){ - //TODO: need to split it - std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl; - }else{ - AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); - out->send(new AMQFrame(channel, contentBody)); - } - } -} - -BasicHeaderProperties* Message::getHeaderProperties(){ - return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); -} - -const ConnectionToken* const Message::getPublisher(){ - return publisher; -} - diff --git a/cpp/broker/src/NameGenerator.cpp b/cpp/broker/src/NameGenerator.cpp deleted file mode 100644 index 46aa385a7e..0000000000 --- a/cpp/broker/src/NameGenerator.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "NameGenerator.h" -#include <sstream> - -using namespace qpid::broker; - -NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {} - -std::string NameGenerator::generate(){ - std::stringstream ss; - ss << base << counter++; - return ss.str(); -} diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp deleted file mode 100644 index eaaa3ffa31..0000000000 --- a/cpp/broker/src/Queue.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Queue.h" -#include "MonitorImpl.h" -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::concurrent; - -Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) : - name(_name), - autodelete(_autodelete), - durable(_durable), - owner(_owner), - queueing(false), - dispatching(false), - next(0), - lastUsed(0), - exclusive(0) -{ - if(autodelete) lastUsed = apr_time_as_msec(apr_time_now()); -} - -Queue::~Queue(){ - for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){ - b->cancel(); - bindings.pop(); - } -} - -void Queue::bound(Binding* b){ - bindings.push(b); -} - -void Queue::deliver(Message::shared_ptr& msg){ - Locker locker(lock); - if(queueing || !dispatch(msg)){ - queueing = true; - messages.push(msg); - } -} - -bool Queue::dispatch(Message::shared_ptr& msg){ - if(consumers.empty()){ - return false; - }else if(exclusive){ - if(!exclusive->deliver(msg)){ - std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; - } - return true; - }else{ - //deliver to next consumer - next = next % consumers.size(); - Consumer* c = consumers[next]; - int start = next; - while(c){ - next++; - if(c->deliver(msg)) return true; - - next = next % consumers.size(); - c = next == start ? 0 : consumers[next]; - } - return false; - } -} - -bool Queue::startDispatching(){ - Locker locker(lock); - if(queueing && !dispatching){ - dispatching = true; - return true; - }else{ - return false; - } -} - -void Queue::dispatch(){ - bool proceed = startDispatching(); - while(proceed){ - Locker locker(lock); - if(!messages.empty() && dispatch(messages.front())){ - messages.pop(); - }else{ - dispatching = false; - proceed = false; - queueing = !messages.empty(); - } - } -} - -void Queue::consume(Consumer* c, bool requestExclusive){ - Locker locker(lock); - if(exclusive) throw ExclusiveAccessException(); - if(requestExclusive){ - if(!consumers.empty()) throw ExclusiveAccessException(); - exclusive = c; - } - - if(autodelete && consumers.empty()) lastUsed = 0; - consumers.push_back(c); -} - -void Queue::cancel(Consumer* c){ - Locker locker(lock); - consumers.erase(find(consumers.begin(), consumers.end(), c)); - if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now()); - if(exclusive == c) exclusive = 0; -} - -Message::shared_ptr Queue::dequeue(){ - Locker locker(lock); - Message::shared_ptr msg; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - return msg; -} - -u_int32_t Queue::purge(){ - Locker locker(lock); - int count = messages.size(); - while(!messages.empty()) messages.pop(); - return count; -} - -u_int32_t Queue::getMessageCount() const{ - Locker locker(lock); - return messages.size(); -} - -u_int32_t Queue::getConsumerCount() const{ - Locker locker(lock); - return consumers.size(); -} - -bool Queue::canAutoDelete() const{ - Locker locker(lock); - return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete); -} diff --git a/cpp/broker/src/QueueRegistry.cpp b/cpp/broker/src/QueueRegistry.cpp deleted file mode 100644 index f807415314..0000000000 --- a/cpp/broker/src/QueueRegistry.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "QueueRegistry.h" -#include "MonitorImpl.h" -#include "SessionHandlerImpl.h" -#include <sstream> -#include <assert.h> - -using namespace qpid::broker; -using namespace qpid::concurrent; - -QueueRegistry::QueueRegistry() : counter(1){} - -QueueRegistry::~QueueRegistry(){} - -std::pair<Queue::shared_ptr, bool> -QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner) -{ - Locker locker(lock); - string name = declareName.empty() ? generateName() : declareName; - assert(!name.empty()); - QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner)); - queues[name] = queue; - return std::pair<Queue::shared_ptr, bool>(queue, true); - } else { - return std::pair<Queue::shared_ptr, bool>(i->second, false); - } -} - -void QueueRegistry::destroy(const string& name){ - Locker locker(lock); - queues.erase(name); -} - -Queue::shared_ptr QueueRegistry::find(const string& name){ - Locker locker(lock); - QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - return Queue::shared_ptr(); - } else { - return i->second; - } -} - -string QueueRegistry::generateName(){ - string name; - do { - std::stringstream ss; - ss << "tmp_" << counter++; - name = ss.str(); - // Thread safety: Private function, only called with lock held - // so this is OK. - } while(queues.find(name) != queues.end()); - return name; -} diff --git a/cpp/broker/src/Router.cpp b/cpp/broker/src/Router.cpp deleted file mode 100644 index c2dd74bf7d..0000000000 --- a/cpp/broker/src/Router.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Router.h" - -using namespace qpid::broker; - -Router::Router(ExchangeRegistry& _registry) : registry(_registry){} - -void Router::operator()(Message::shared_ptr& msg){ - Exchange* exchange = registry.get(msg->getExchange()); - if(exchange){ - exchange->route(msg, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); - }else{ - std::cout << "WARNING: Could not route message, unknown exchange: " << msg->getExchange() << std::endl; - } - -} diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp deleted file mode 100644 index 39c627afef..0000000000 --- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "SessionHandlerFactoryImpl.h" -#include "SessionHandlerImpl.h" -#include "FanOutExchange.h" -#include "HeadersExchange.h" - -using namespace qpid::broker; -using namespace qpid::io; - -namespace -{ -const std::string empty; -const std::string amq_direct("amq.direct"); -const std::string amq_topic("amq.topic"); -const std::string amq_fanout("amq.fanout"); -const std::string amq_match("amq.match"); -} - -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){ - exchanges.declare(new DirectExchange(empty)); // Default exchange. - exchanges.declare(new DirectExchange(amq_direct)); - exchanges.declare(new TopicExchange(amq_topic)); - exchanges.declare(new FanOutExchange(amq_fanout)); - exchanges.declare(new HeadersExchange(amq_match)); - cleaner.start(); -} - -SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){ - return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout); -} - -SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){ - cleaner.stop(); -} diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp deleted file mode 100644 index 0d8539332c..0000000000 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ /dev/null @@ -1,405 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "SessionHandlerImpl.h" -#include "FanOutExchange.h" -#include "HeadersExchange.h" -#include "Router.h" -#include "TopicExchange.h" -#include "assert.h" - -using namespace std::tr1; -using namespace qpid::broker; -using namespace qpid::io; -using namespace qpid::framing; -using namespace qpid::concurrent; - -SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, - QueueRegistry* _queues, - ExchangeRegistry* _exchanges, - AutoDelete* _cleaner, - const u_int32_t _timeout) : - context(_context), - client(context), - queues(_queues), - exchanges(_exchanges), - cleaner(_cleaner), - timeout(_timeout), - connectionHandler(new ConnectionHandlerImpl(this)), - channelHandler(new ChannelHandlerImpl(this)), - basicHandler(new BasicHandlerImpl(this)), - exchangeHandler(new ExchangeHandlerImpl(this)), - queueHandler(new QueueHandlerImpl(this)), - framemax(65536), - heartbeat(0) {} - -SessionHandlerImpl::~SessionHandlerImpl(){ - // TODO aconway 2006-09-07: Should be auto_ptr or plain members. - delete channelHandler; - delete connectionHandler; - delete basicHandler; - delete exchangeHandler; - delete queueHandler; -} - -Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ - channel_iterator i = channels.find(channel); - if(i == channels.end()){ - throw ConnectionException(504, "Unknown channel: " + channel); - } - return i->second; -} - -Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ - Queue::shared_ptr queue; - if (name.empty()) { - queue = getChannel(channel)->getDefaultQueue(); - if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); - } else { - queue = queues->find(name); - if (queue == 0) { - throw ChannelException( 404, "Queue not found: " + name); - } - } - return queue; -} - - -Exchange* SessionHandlerImpl::findExchange(const string& name){ - exchanges->getLock()->acquire(); - Exchange* exchange(exchanges->get(name)); - exchanges->getLock()->release(); - return exchange; -} - -void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ - u_int16_t channel = frame->getChannel(); - AMQBody::shared_ptr body = frame->getBody(); - AMQMethodBody::shared_ptr method; - - switch(body->type()) - { - case METHOD_BODY: - method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); - try{ - method->invoke(*this, channel); - }catch(ChannelException& e){ - channels[channel]->close(); - channels.erase(channel); - client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(ConnectionException& e){ - client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - } - break; - - case HEADER_BODY: - this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); - break; - - case CONTENT_BODY: - this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); - break; - - case HEARTBEAT_BODY: - //channel must be 0 - this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); - break; - } -} - -void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){ - //send connection start - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); - client.getConnection().start(0, 8, 0, properties, mechanisms, locales); -} - -void SessionHandlerImpl::idleOut(){ - -} - -void SessionHandlerImpl::idleIn(){ - -} - -void SessionHandlerImpl::closed(){ - for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ - Channel* c = i->second; - channels.erase(i); - c->close(); - delete c; - } - for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ - string name = (*i)->getName(); - queues->destroy(name); - exclusiveQueues.erase(i); - } -} - -void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body, Router(*exchanges)); -} - -void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body, Router(*exchanges)); -} - -void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl; -} - -void SessionHandlerImpl::ConnectionHandlerImpl::startOk( - u_int16_t /*channel*/, FieldTable& /*clientProperties*/, string& /*mechanism*/, - string& /*response*/, string& /*locale*/){ - - parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, string& /*response*/){} - -void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ - parent->framemax = framemax; - parent->heartbeat = heartbeat; -} - -void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, string& /*virtualHost*/, string& /*capabilities*/, bool /*insist*/){ - string knownhosts; - parent->client.getConnection().openOk(0, knownhosts); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::close( - u_int16_t /*channel*/, u_int16_t /*replyCode*/, string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/) -{ - parent->client.getConnection().closeOk(0); - parent->context->close(); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ - parent->context->close(); -} - - - -void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*outOfBand*/){ - parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); - parent->client.getChannel().openOk(channel); -} - -void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} -void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} - -void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/){ - Channel* c = parent->getChannel(channel); - if(c){ - parent->channels.erase(channel); - c->close(); - delete c; - parent->client.getChannel().closeOk(channel); - } -} - -void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} - - - -void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - FieldTable& /*arguments*/){ - - if(!passive && ( - type != TopicExchange::typeName && - type != DirectExchange::typeName && - type != FanOutExchange::typeName && - type != HeadersExchange::typeName - ) - ) - { - throw ChannelException(540, "Exchange type not implemented: " + type); - } - - parent->exchanges->getLock()->acquire(); - if(!parent->exchanges->get(exchange)){ - if(type == TopicExchange::typeName){ - parent->exchanges->declare(new TopicExchange(exchange)); - }else if(type == DirectExchange::typeName){ - parent->exchanges->declare(new DirectExchange(exchange)); - }else if(type == FanOutExchange::typeName){ - parent->exchanges->declare(new DirectExchange(exchange)); - }else if (type == HeadersExchange::typeName) { - parent->exchanges->declare(new HeadersExchange(exchange)); - } - } - parent->exchanges->getLock()->release(); - if(!nowait){ - parent->client.getExchange().declareOk(channel); - } -} - -void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){ - //TODO: implement unused - parent->exchanges->getLock()->acquire(); - parent->exchanges->destroy(exchange); - parent->exchanges->getLock()->release(); - if(!nowait) parent->client.getExchange().deleteOk(channel); -} - -void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, FieldTable& /*arguments*/){ - Queue::shared_ptr queue; - if (passive && !name.empty()) { - queue = parent->getQueue(name, channel); - } else { - std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - parent->getChannel(channel)->setDefaultQueue(queue); - //add default binding: - parent->exchanges->getDefault()->bind(queue, name, 0); - if(exclusive){ - parent->exclusiveQueues.push_back(queue); - } else if(autoDelete){ - parent->cleaner->add(queue); - } - } - } - if(exclusive && !queue->isExclusiveOwner(parent)){ - throw ChannelException(405, "Cannot grant exclusive access to queue"); - } - if(!nowait){ - name = queue->getName(); - parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); - } -} - -void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, - string& exchangeName, string& routingKey, bool nowait, - FieldTable& arguments){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - Exchange* exchange = parent->exchanges->get(exchangeName); - if(exchange){ - if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); - exchange->bind(queue, routingKey, &arguments); - if(!nowait) parent->client.getQueue().bindOk(channel); - }else{ - throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); - } -} - -void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, bool nowait){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - int count = queue->purge(); - if(!nowait) parent->client.getQueue().purgeOk(channel, count); -} - -void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ - ChannelException error(0, ""); - int count(0); - Queue::shared_ptr q = parent->getQueue(queue, channel); - if(ifEmpty && q->getMessageCount() > 0){ - throw ChannelException(406, "Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw ChannelException(406, "Queue in use."); - }else{ - //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(parent)){ - queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); - if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); - } - count = q->getMessageCount(); - parent->queues->destroy(queue); - } - if(!nowait) parent->client.getQueue().deleteOk(channel, count); -} - - - - -void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ - //TODO: handle global - parent->getChannel(channel)->setPrefetchSize(prefetchSize); - parent->getChannel(channel)->setPrefetchCount(prefetchCount); - parent->client.getBasic().qosOk(channel); -} - -void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/, - string& queueName, string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - Channel* channel = parent->channels[channelId]; - if(!consumerTag.empty() && channel->exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0); - if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - -} - -void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ - parent->getChannel(channel)->cancel(consumerTag); - if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); -} - -void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - string& exchange, string& routingKey, - bool mandatory, bool immediate){ - - Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); - parent->getChannel(channel)->handlePublish(msg); -} - -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){ - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - if(!parent->getChannel(channelId)->get(queue, !noAck)){ - string clusterId;//not used, part of an imatix hack - parent->client.getBasic().getEmpty(channelId, clusterId); - } -} - -void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ - try{ - parent->getChannel(channel)->ack(deliveryTag, multiple); - }catch(InvalidAckException& e){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - } -} - -void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} - -void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ - parent->getChannel(channel)->recover(requeue); -} - diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp deleted file mode 100644 index 53977747c4..0000000000 --- a/cpp/broker/src/TopicExchange.cpp +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "TopicExchange.h" -#include "ExchangeBinding.h" -#include <algorithm> - -using namespace qpid::broker; -using namespace qpid::framing; - - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// Areas for improvement: -// - excessive string copying: should be 0 copy, match from original buffer. -// - match/lookup: use descision tree or other more efficient structure. - -Tokens& Tokens::operator=(const std::string& s) { - clear(); - if (s.empty()) return *this; - std::string::const_iterator i = s.begin(); - while (true) { - // Invariant: i is at the beginning of the next untokenized word. - std::string::const_iterator j = find(i, s.end(), '.'); - push_back(std::string(i, j)); - if (j == s.end()) return *this; - i = j + 1; - } - return *this; -} - -size_t Tokens::Hash::operator()(const Tokens& p) const { - size_t hash = 0; - for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) { - hash += std::tr1::hash<std::string>()(*i); - } - return hash; -} - -TopicPattern& TopicPattern::operator=(const Tokens& tokens) { - Tokens::operator=(tokens); - normalize(); - return *this; -} - -namespace { -const std::string hashmark("#"); -const std::string star("*"); -} - -void TopicPattern::normalize() { - std::string word; - Tokens::iterator i = begin(); - while (i != end()) { - if (*i == hashmark) { - ++i; - while (i != end()) { - // Invariant: *(i-1)==#, [begin()..i-1] is normalized. - if (*i == star) { // Move * before #. - std::swap(*i, *(i-1)); - ++i; - } else if (*i == hashmark) { - erase(i); // Remove extra # - } else { - break; - } - } - } else { - i ++; - } - } -} - - -namespace { -// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. -// Need more efficient Tokens impl that can operate on a string in place. -// -bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) -{ - // Invariant: [pattern_begin..p) matches [target_begin..t) - Tokens::const_iterator p = pattern_begin; - Tokens::const_iterator t = target_begin; - while (p != pattern_end && t != target_end) - { - if (*p == star || *p == *t) { - ++p, ++t; - } else if (*p == hashmark) { - ++p; - if (do_match(p, pattern_end, t, target_end)) return true; - while (t != target_end) { - ++t; - if (do_match(p, pattern_end, t, target_end)) return true; - } - return false; - } else { - return false; - } - } - while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # - return t == target_end && p == pattern_end; -} -} - -bool TopicPattern::match(const Tokens& target) const -{ - return do_match(begin(), end(), target.begin(), target.end()); -} - -TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } - -void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - lock.acquire(); - TopicPattern routingPattern(routingKey); - bindings[routingPattern].push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - lock.release(); -} - -void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); - BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); - Queue::vector& qv(bi->second); - if (bi == bindings.end()) return; - Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); - if(q == qv.end()) return; - qv.erase(q); - if(qv.empty()) bindings.erase(bi); - lock.release(); -} - - -void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (i->first.match(routingKey)) { - Queue::vector& qv(i->second); - for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ - (*j)->deliver(msg); - } - } - } - lock.release(); -} - -TopicExchange::~TopicExchange() {} - -const std::string TopicExchange::typeName("topic"); - - |