diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/broker/src | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src')
-rw-r--r-- | cpp/broker/src/AutoDelete.cpp | 93 | ||||
-rw-r--r-- | cpp/broker/src/Broker.cpp | 92 | ||||
-rw-r--r-- | cpp/broker/src/Channel.cpp | 148 | ||||
-rw-r--r-- | cpp/broker/src/Configuration.cpp | 195 | ||||
-rw-r--r-- | cpp/broker/src/DirectExchange.cpp | 72 | ||||
-rw-r--r-- | cpp/broker/src/ExchangeBinding.cpp | 32 | ||||
-rw-r--r-- | cpp/broker/src/ExchangeRegistry.cpp | 43 | ||||
-rw-r--r-- | cpp/broker/src/FanOutExchange.cpp | 56 | ||||
-rw-r--r-- | cpp/broker/src/Message.cpp | 97 | ||||
-rw-r--r-- | cpp/broker/src/NameGenerator.cpp | 29 | ||||
-rw-r--r-- | cpp/broker/src/Queue.cpp | 148 | ||||
-rw-r--r-- | cpp/broker/src/QueueRegistry.cpp | 72 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerFactoryImpl.cpp | 40 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 378 | ||||
-rw-r--r-- | cpp/broker/src/TopicExchange.cpp | 62 |
15 files changed, 1557 insertions, 0 deletions
diff --git a/cpp/broker/src/AutoDelete.cpp b/cpp/broker/src/AutoDelete.cpp new file mode 100644 index 0000000000..6793ec449d --- /dev/null +++ b/cpp/broker/src/AutoDelete.cpp @@ -0,0 +1,93 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AutoDelete.h" + +using namespace qpid::broker; + +AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry), + period(_period), + stopped(true), + runner(0){} + +void AutoDelete::add(Queue::shared_ptr const queue){ + lock.acquire(); + queues.push(queue); + lock.release(); +} + +Queue::shared_ptr const AutoDelete::pop(){ + Queue::shared_ptr next; + lock.acquire(); + if(!queues.empty()){ + next = queues.front(); + queues.pop(); + } + lock.release(); + return next; +} + +void AutoDelete::process(){ + Queue::shared_ptr seen; + for(Queue::shared_ptr q = pop(); q; q = pop()){ + if(seen == q){ + add(q); + break; + }else if(q->canAutoDelete()){ + std::string name(q->getName()); + registry->destroy(name); + std::cout << "INFO: Auto-deleted queue named " << name << std::endl; + }else{ + add(q); + if(!seen) seen = q; + } + } +} + +void AutoDelete::run(){ + monitor.acquire(); + while(!stopped){ + process(); + monitor.wait(period); + } + monitor.release(); +} + +void AutoDelete::start(){ + monitor.acquire(); + if(stopped){ + runner = factory.create(this); + stopped = false; + monitor.release(); + runner->start(); + }else{ + monitor.release(); + } +} + +void AutoDelete::stop(){ + monitor.acquire(); + if(!stopped){ + stopped = true; + monitor.notify(); + monitor.release(); + runner->join(); + delete runner; + }else{ + monitor.release(); + } +} diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp new file mode 100644 index 0000000000..5d59b63622 --- /dev/null +++ b/cpp/broker/src/Broker.cpp @@ -0,0 +1,92 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <memory> +#include "apr_signal.h" + +#include "Acceptor.h" +#include "Configuration.h" +#include "QpidError.h" +#include "SessionHandlerFactoryImpl.h" + +//optional includes: +#ifdef _USE_APR_IO_ + +#include "BlockingAPRAcceptor.h" +#include "LFAcceptor.h" + +#endif + +using namespace qpid::broker; +using namespace qpid::io; + +void handle_signal(int signal); + +Acceptor* createAcceptor(Configuration& config); + +int main(int argc, char** argv) +{ + SessionHandlerFactoryImpl factory; + Configuration config; + try{ + + config.parse(argc, argv); + if(config.isHelp()){ + config.usage(); + }else{ +#ifdef _USE_APR_IO_ + apr_signal(SIGINT, handle_signal); +#endif + try{ + std::auto_ptr<Acceptor> acceptor(createAcceptor(config)); + try{ + acceptor->bind(config.getPort(), &factory); + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + } + }catch(Configuration::ParseException error){ + std::cout << "Error: " << error.error << std::endl; + } + + return 1; +} + +Acceptor* createAcceptor(Configuration& config){ + const string type(config.getAcceptor()); +#ifdef _USE_APR_IO_ + if("blocking" == type){ + std::cout << "Using blocking acceptor " << std::endl; + return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); + }else if("non-blocking" == type){ + std::cout << "Using non-blocking acceptor " << std::endl; + return new LFAcceptor(config.isTrace(), + config.getConnectionBacklog(), + config.getWorkerThreads(), + config.getMaxConnections()); + } +#endif + throw Configuration::ParseException("Unrecognised acceptor: " + type); +} + +void handle_signal(int signal){ + std::cout << "Shutting down..." << std::endl; +} diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp new file mode 100644 index 0000000000..6980fe5a1b --- /dev/null +++ b/cpp/broker/src/Channel.cpp @@ -0,0 +1,148 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Channel.h" +#include "QpidError.h" +#include <iostream> +#include <sstream> +#include <assert.h> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out), + id(_id), + framesize(_framesize), + transactional(false), + deliveryTag(1), + tagGenerator("sgen"){} + +Channel::~Channel(){ + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ + std::cout << "ERROR: Channel consumer appears not to have been cancelled before channel was destroyed." << std::endl; + delete (i->second); + } +} + +bool Channel::exists(string& consumerTag){ + return consumers.find(consumerTag) != consumers.end(); +} + +void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){ + if(tag.empty()) tag = tagGenerator.generate(); + + ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection)); + try{ + queue->consume(c, exclusive);//may throw exception + consumers[tag] = c; + }catch(ExclusiveAccessException& e){ + delete c; + throw e; + } +} + +void Channel::cancel(string& tag){ + ConsumerImpl* c = consumers[tag]; + if(c){ + c->cancel(); + consumers.erase(tag); + delete c; + } +} + +void Channel::close(){ + //cancel all consumers + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ + ConsumerImpl* c = i->second; + c->cancel(); + consumers.erase(i); + delete c; + } +} + +void Channel::begin(){ + transactional = true; +} + +void Channel::commit(){ + +} + +void Channel::rollback(){ + +} + +void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){ + //send deliver method, header and content(s) + msg->deliver(out, id, consumerTag, deliveryTag++, framesize); +} + +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, + Queue::shared_ptr _queue, + ConnectionToken* const _connection) : parent(_parent), + tag(_tag), + queue(_queue), + connection(_connection){ +} + +bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ + if(connection != msg->getPublisher()){ + parent->deliver(msg, tag); + return true; + }else{ + return false; + } +} + +void Channel::ConsumerImpl::cancel(){ + if(queue) queue->cancel(this); +} + +void Channel::handlePublish(Message* msg){ + if(message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); + } + message = Message::shared_ptr(msg); +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges){ + if(!message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); + } + message->setHeader(header); + if(message->isComplete()){ + publish(exchanges); + } +} + +void Channel::handleContent(AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges){ + if(!message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); + } + message->addContent(content); + if(message->isComplete()){ + publish(exchanges); + } +} + +void Channel::publish(ExchangeRegistry* exchanges){ + if(!route(message, exchanges)){ + std::cout << "WARNING: Could not route message." << std::endl; + } + message.reset(); +} diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp new file mode 100644 index 0000000000..aceb35bc87 --- /dev/null +++ b/cpp/broker/src/Configuration.cpp @@ -0,0 +1,195 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Configuration.h" + +using namespace qpid::broker; +using namespace std; + +Configuration::Configuration() : + trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false), + port('p', "port", "Sets the port to listen on (default=5672)", 5672), + workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5), + maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500), + connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), + acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"), + help("help", "Prints usage information", false) +{ + options.push_back(&trace); + options.push_back(&port); + options.push_back(&workerThreads); + options.push_back(&maxConnections); + options.push_back(&connectionBacklog); + options.push_back(&acceptor); + options.push_back(&help); +} + +Configuration::~Configuration(){} + +void Configuration::parse(int argc, char** argv){ + int position = 1; + while(position < argc){ + bool matched(false); + for(op_iterator i = options.begin(); i < options.end() && !matched; i++){ + matched = (*i)->parse(position, argv, argc); + } + if(!matched){ + std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl; + position++; + } + } +} + +void Configuration::usage(){ + for(op_iterator i = options.begin(); i < options.end(); i++){ + (*i)->print(std::cout); + } +} + +bool Configuration::isHelp(){ + return help.getValue(); +} + +bool Configuration::isTrace(){ + return trace.getValue(); +} + +int Configuration::getPort(){ + return port.getValue(); +} + +int Configuration::getWorkerThreads(){ + return workerThreads.getValue(); +} + +int Configuration::getMaxConnections(){ + return maxConnections.getValue(); +} + +int Configuration::getConnectionBacklog(){ + return connectionBacklog.getValue(); +} + +const string& Configuration::getAcceptor(){ + return acceptor.getValue(); +} + +Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : + flag(string("-") + _flag), name("--" +_name), desc(_desc) {} + +Configuration::Option::Option(const string& _name, const string& _desc) : + flag(""), name("--" + _name), desc(_desc) {} + +Configuration::Option::~Option(){} + +bool Configuration::Option::match(const string& arg){ + return flag == arg || name == arg; +} + +bool Configuration::Option::parse(int& i, char** argv, int argc){ + const string arg(argv[i]); + if(match(arg)){ + if(needsValue()){ + if(++i < argc) setValue(argv[i]); + else throw ParseException("Argument " + arg + " requires a value!"); + }else{ + setValue(""); + } + i++; + return true; + }else{ + return false; + } +} + +void Configuration::Option::print(ostream& out) const { + out << " "; + if(flag.length() > 0){ + out << flag << " or "; + } + out << name; + if(needsValue()) out << "<value>"; + out << std::endl; + out << " " << desc << std::endl; +} + + +// String Option: + +Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) : + Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) : + Option(_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::StringOption::~StringOption(){} + +const string& Configuration::StringOption::getValue() const { + return value; +} + +bool Configuration::StringOption::needsValue() const { + return true; +} + +void Configuration::StringOption::setValue(const std::string& _value){ + value = _value; +} + +// Int Option: + +Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) : + Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) : + Option(_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::IntOption::~IntOption(){} + +int Configuration::IntOption::getValue() const { + return value; +} + +bool Configuration::IntOption::needsValue() const { + return true; +} + +void Configuration::IntOption::setValue(const std::string& _value){ + value = atoi(_value.c_str()); +} + +// Bool Option: + +Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) : + Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) : + Option(_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::BoolOption::~BoolOption(){} + +bool Configuration::BoolOption::getValue() const { + return value; +} + +bool Configuration::BoolOption::needsValue() const { + return false; +} + +void Configuration::BoolOption::setValue(const std::string& _value){ + value = true; +} diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp new file mode 100644 index 0000000000..70f7ee838f --- /dev/null +++ b/cpp/broker/src/DirectExchange.cpp @@ -0,0 +1,72 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "DirectExchange.h" +#include "ExchangeBinding.h" +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::framing; + +DirectExchange::DirectExchange(const string& _name) : name(_name) { + +} + +void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + lock.acquire(); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + if(i == queues.end()){ + bindings[routingKey].push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); + } + lock.release(); +} + +void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + lock.acquire(); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + + std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + if(i < queues.end()){ + queues.erase(i); + if(queues.empty()){ + bindings.erase(routingKey); + } + } + lock.release(); +} + +void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ + lock.acquire(); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + int count(0); + for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ + (*i)->deliver(msg); + } + if(!count){ + std::cout << "WARNING: DirectExchange " << name << " could not route message with key " << routingKey << std::endl; + } + lock.release(); +} + +DirectExchange::~DirectExchange(){ + +} + + +const std::string DirectExchange::typeName("direct"); diff --git a/cpp/broker/src/ExchangeBinding.cpp b/cpp/broker/src/ExchangeBinding.cpp new file mode 100644 index 0000000000..6160a67fd3 --- /dev/null +++ b/cpp/broker/src/ExchangeBinding.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "ExchangeBinding.h" +#include "Exchange.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){} + +void ExchangeBinding::cancel(){ + e->unbind(q, key, args); + delete this; +} + +ExchangeBinding::~ExchangeBinding(){ +} diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp new file mode 100644 index 0000000000..0ee581af2f --- /dev/null +++ b/cpp/broker/src/ExchangeRegistry.cpp @@ -0,0 +1,43 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "ExchangeRegistry.h" +#include "MonitorImpl.h" + +using namespace qpid::broker; +using namespace qpid::concurrent; + +ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){} + +ExchangeRegistry::~ExchangeRegistry(){ + delete lock; +} + +void ExchangeRegistry::declare(Exchange* exchange){ + exchanges[exchange->getName()] = exchange; +} + +void ExchangeRegistry::destroy(const string& name){ + if(exchanges[name]){ + delete exchanges[name]; + exchanges.erase(name); + } +} + +Exchange* ExchangeRegistry::get(const string& name){ + return exchanges[name]; +} diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp new file mode 100644 index 0000000000..7f261d5eda --- /dev/null +++ b/cpp/broker/src/FanOutExchange.cpp @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "FanOutExchange.h" +#include "ExchangeBinding.h" +#include <algorithm> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +FanOutExchange::FanOutExchange(const string& _name) : name(_name) {} + +void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + Locker locker(lock); + // Add if not already present. + Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + if (i == bindings.end()) { + bindings.push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); + } +} + +void FanOutExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + Locker locker(lock); + Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + if (i != bindings.end()) { + bindings.erase(i); + // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match? + } +} + +void FanOutExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ + Locker locker(lock); + for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ + (*i)->deliver(msg); + } +} + +FanOutExchange::~FanOutExchange() {} + +const std::string FanOutExchange::typeName("fanout"); diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp new file mode 100644 index 0000000000..7afcd97934 --- /dev/null +++ b/cpp/broker/src/Message.cpp @@ -0,0 +1,97 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "MonitorImpl.h" +#include "Message.h" +#include "ExchangeRegistry.h" +#include <iostream> + +using namespace std::tr1;//for *_pointer_cast methods +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + + +Message::Message(const ConnectionToken* const _publisher, + const string& _exchange, const string& _routingKey, + bool _mandatory, bool _immediate) : publisher(_publisher), + exchange(_exchange), + routingKey(_routingKey), + mandatory(_mandatory), + immediate(_immediate){ + +} + +Message::~Message(){ +} + +void Message::setHeader(AMQHeaderBody::shared_ptr header){ + this->header = header; +} + +void Message::addContent(AMQContentBody::shared_ptr data){ + content.push_back(data); +} + +bool Message::isComplete(){ + return header.get() && (header->getContentSize() == contentSize()); +} + +void Message::deliver(OutputHandler* out, int channel, + string& consumerTag, u_int64_t deliveryTag, + u_int32_t framesize){ + + out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, false, exchange, routingKey))); + AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); + out->send(new AMQFrame(channel, headerBody)); + for(content_iterator i = content.begin(); i != content.end(); i++){ + if((*i)->size() > framesize){ + //TODO: need to split it + std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl; + }else{ + AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); + out->send(new AMQFrame(channel, contentBody)); + } + } +} + +BasicHeaderProperties* Message::getHeaderProperties(){ + return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); +} + +u_int64_t Message::contentSize(){ + u_int64_t size(0); + for(content_iterator i = content.begin(); i != content.end(); i++){ + size += (*i)->size(); + } + return size; +} + +const ConnectionToken* const Message::getPublisher(){ + return publisher; +} + +bool qpid::broker::route(Message::shared_ptr& msg, ExchangeRegistry* registry){ + Exchange* exchange = registry->get(msg->exchange); + if(exchange){ + exchange->route(msg, msg->routingKey, &(msg->getHeaderProperties()->getHeaders())); + return true; + }else{ + return false; + } +} + diff --git a/cpp/broker/src/NameGenerator.cpp b/cpp/broker/src/NameGenerator.cpp new file mode 100644 index 0000000000..46aa385a7e --- /dev/null +++ b/cpp/broker/src/NameGenerator.cpp @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "NameGenerator.h" +#include <sstream> + +using namespace qpid::broker; + +NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {} + +std::string NameGenerator::generate(){ + std::stringstream ss; + ss << base << counter++; + return ss.str(); +} diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp new file mode 100644 index 0000000000..f7b8605b03 --- /dev/null +++ b/cpp/broker/src/Queue.cpp @@ -0,0 +1,148 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Queue.h" +#include "MonitorImpl.h" +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::concurrent; + +Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) : name(_name), + durable(_durable), + autodelete(_autodelete), + owner(_owner), + queueing(false), + dispatching(false), + next(0), + lastUsed(0), + exclusive(0){ + + if(autodelete) lastUsed = apr_time_as_msec(apr_time_now()); +} + +Queue::~Queue(){ + for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){ + b->cancel(); + bindings.pop(); + } +} + +void Queue::bound(Binding* b){ + bindings.push(b); +} + +void Queue::deliver(Message::shared_ptr& msg){ + Locker locker(lock); + if(queueing || !dispatch(msg)){ + queueing = true; + messages.push(msg); + } +} + +bool Queue::dispatch(Message::shared_ptr& msg){ + if(consumers.empty()){ + return false; + }else if(exclusive){ + if(!exclusive->deliver(msg)){ + std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; + } + return true; + }else{ + //deliver to next consumer + next = next % consumers.size(); + Consumer* c = consumers[next]; + int start = next; + while(c){ + next++; + if(c->deliver(msg)) return true; + + next = next % consumers.size(); + c = next == start ? 0 : consumers[next]; + } + return false; + } +} + +bool Queue::startDispatching(){ + Locker locker(lock); + if(queueing && !dispatching){ + dispatching = true; + return true; + }else{ + return false; + } +} + +void Queue::dispatch(){ + bool proceed = startDispatching(); + while(proceed){ + Locker locker(lock); + if(!messages.empty() && dispatch(messages.front())){ + messages.pop(); + }else{ + dispatching = false; + proceed = false; + queueing = !messages.empty(); + } + } +} + +void Queue::consume(Consumer* c, bool requestExclusive){ + Locker locker(lock); + if(exclusive) throw ExclusiveAccessException(); + if(requestExclusive){ + if(!consumers.empty()) throw ExclusiveAccessException(); + exclusive = c; + } + + if(autodelete && consumers.empty()) lastUsed = 0; + consumers.push_back(c); +} + +void Queue::cancel(Consumer* c){ + Locker locker(lock); + consumers.erase(find(consumers.begin(), consumers.end(), c)); + if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now()); + if(exclusive == c) exclusive = 0; +} + +Message::shared_ptr Queue::dequeue(){ + +} + +u_int32_t Queue::purge(){ + Locker locker(lock); + int count = messages.size(); + while(!messages.empty()) messages.pop(); + return count; +} + +u_int32_t Queue::getMessageCount() const{ + Locker locker(lock); + return messages.size(); +} + +u_int32_t Queue::getConsumerCount() const{ + Locker locker(lock); + return consumers.size(); +} + +bool Queue::canAutoDelete() const{ + Locker locker(lock); + return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete); +} diff --git a/cpp/broker/src/QueueRegistry.cpp b/cpp/broker/src/QueueRegistry.cpp new file mode 100644 index 0000000000..f807415314 --- /dev/null +++ b/cpp/broker/src/QueueRegistry.cpp @@ -0,0 +1,72 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "QueueRegistry.h" +#include "MonitorImpl.h" +#include "SessionHandlerImpl.h" +#include <sstream> +#include <assert.h> + +using namespace qpid::broker; +using namespace qpid::concurrent; + +QueueRegistry::QueueRegistry() : counter(1){} + +QueueRegistry::~QueueRegistry(){} + +std::pair<Queue::shared_ptr, bool> +QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner) +{ + Locker locker(lock); + string name = declareName.empty() ? generateName() : declareName; + assert(!name.empty()); + QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { + Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner)); + queues[name] = queue; + return std::pair<Queue::shared_ptr, bool>(queue, true); + } else { + return std::pair<Queue::shared_ptr, bool>(i->second, false); + } +} + +void QueueRegistry::destroy(const string& name){ + Locker locker(lock); + queues.erase(name); +} + +Queue::shared_ptr QueueRegistry::find(const string& name){ + Locker locker(lock); + QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { + return Queue::shared_ptr(); + } else { + return i->second; + } +} + +string QueueRegistry::generateName(){ + string name; + do { + std::stringstream ss; + ss << "tmp_" << counter++; + name = ss.str(); + // Thread safety: Private function, only called with lock held + // so this is OK. + } while(queues.find(name) != queues.end()); + return name; +} diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp new file mode 100644 index 0000000000..661cb4ef81 --- /dev/null +++ b/cpp/broker/src/SessionHandlerFactoryImpl.cpp @@ -0,0 +1,40 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "SessionHandlerFactoryImpl.h" +#include "SessionHandlerImpl.h" +#include "FanOutExchange.h" + +using namespace qpid::broker; +using namespace qpid::io; + +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){ + exchanges.declare(new DirectExchange("amq.direct")); + exchanges.declare(new TopicExchange("amq.topic")); + exchanges.declare(new FanOutExchange("amq.fanout")); + cleaner.start(); +} + +SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){ + return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout); +} + +SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){ + cleaner.stop(); + exchanges.destroy("amq.direct"); + exchanges.destroy("amq.topic"); +} diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp new file mode 100644 index 0000000000..19e243a01b --- /dev/null +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -0,0 +1,378 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "SessionHandlerImpl.h" +#include "FanOutExchange.h" +#include "assert.h" + +using namespace std::tr1; +using namespace qpid::broker; +using namespace qpid::io; +using namespace qpid::framing; +using namespace qpid::concurrent; + +SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, + QueueRegistry* _queues, + ExchangeRegistry* _exchanges, + AutoDelete* _cleaner, + const u_int32_t _timeout) : context(_context), + queues(_queues), + exchanges(_exchanges), + cleaner(_cleaner), + timeout(_timeout), + channelHandler(new ChannelHandlerImpl(this)), + connectionHandler(new ConnectionHandlerImpl(this)), + basicHandler(new BasicHandlerImpl(this)), + exchangeHandler(new ExchangeHandlerImpl(this)), + queueHandler(new QueueHandlerImpl(this)), + framemax(65536), + heartbeat(0){ + +} + +SessionHandlerImpl::~SessionHandlerImpl(){ + // TODO aconway 2006-09-07: Should be auto_ptr or plain members. + delete channelHandler; + delete connectionHandler; + delete basicHandler; + delete exchangeHandler; + delete queueHandler; +} + +Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ + Queue::shared_ptr queue; + if (name.empty()) { + queue = channels[channel]->getDefaultQueue(); + if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); + } else { + queue = queues->find(name); + if (queue == 0) { + throw ChannelException( 404, "Queue not found: " + name); + } + } + return queue; +} + + +Exchange* SessionHandlerImpl::findExchange(const string& name){ + exchanges->getLock()->acquire(); + Exchange* exchange(exchanges->get(name)); + exchanges->getLock()->release(); + return exchange; +} + +void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ + u_int16_t channel = frame->getChannel(); + AMQBody::shared_ptr body = frame->getBody(); + AMQMethodBody::shared_ptr method; + + switch(body->type()) + { + case METHOD_BODY: + method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); + try{ + method->invoke(*this, channel); + }catch(ChannelException& e){ + channels[channel]->close(); + channels.erase(channel); + context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId()))); + }catch(ConnectionException& e){ + context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId()))); + } + break; + + case HEADER_BODY: + this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + break; + + case CONTENT_BODY: + this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + break; + + case HEARTBEAT_BODY: + //channel must be 0 + this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + break; + } +} + +void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ + //send connection start + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); + context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales))); +} + +void SessionHandlerImpl::idleOut(){ + +} + +void SessionHandlerImpl::idleIn(){ + +} + +void SessionHandlerImpl::closed(){ + for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ + Channel* c = i->second; + channels.erase(i); + c->close(); + delete c; + } + for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ + string name = (*i)->getName(); + queues->destroy(name); + exclusiveQueues.erase(i); + } +} + +void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ + channels[channel]->handleHeader(body, exchanges); +} + +void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ + channels[channel]->handleContent(body, exchanges); +} + +void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ + std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl; +} + +void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism, + string& response, string& locale){ + + parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat))); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){} + +void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int16_t channelmax, u_int32_t framemax, u_int16_t heartbeat){ + parent->framemax = framemax; + parent->heartbeat = heartbeat; +} + +void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){ + string knownhosts; + parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts))); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, + u_int16_t classId, u_int16_t methodId){ + + parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody())); + parent->context->close(); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){ + parent->context->close(); +} + + + +void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){ + parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); + parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody())); +} + +void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){} +void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t channel, bool active){} + +void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, + u_int16_t classId, u_int16_t methodId){ + Channel* c = parent->channels[channel]; + parent->channels.erase(channel); + c->close(); + delete c; + parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody())); +} + +void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} + + + +void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type, + bool passive, bool durable, bool autoDelete, bool internal, bool nowait, + FieldTable& arguments){ + + if(!passive && ( + type != TopicExchange::typeName && + type != DirectExchange::typeName && + type != FanOutExchange::typeName) + ) + { + throw ChannelException(540, "Exchange type not implemented: " + type); + } + + parent->exchanges->getLock()->acquire(); + if(!parent->exchanges->get(exchange)){ + if(type == TopicExchange::typeName){ + parent->exchanges->declare(new TopicExchange(exchange)); + }else if(type == DirectExchange::typeName){ + parent->exchanges->declare(new DirectExchange(exchange)); + }else if(type == FanOutExchange::typeName){ + parent->exchanges->declare(new DirectExchange(exchange)); + } + } + parent->exchanges->getLock()->release(); + if(!nowait){ + parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody())); + } +} + +void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait){ + //TODO: implement unused + parent->exchanges->getLock()->acquire(); + parent->exchanges->destroy(exchange); + parent->exchanges->getLock()->release(); + if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody())); +} + + + + +void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, FieldTable& arguments){ + Queue::shared_ptr queue; + if (passive && !name.empty()) { + queue = parent->getQueue(name, channel); + } else { + std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue + parent->channels[channel]->setDefaultQueue(queue); + //add default binding: + parent->exchanges->get("amq.direct")->bind(queue, name, 0); + if(exclusive){ + parent->exclusiveQueues.push_back(queue); + } else if(autoDelete){ + parent->cleaner->add(queue); + } + } + } + if(exclusive && !queue->isExclusiveOwner(parent)){ + throw ChannelException(405, "Cannot grant exclusive access to queue"); + } + if(!nowait){ + name = queue->getName(); + QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount()); + parent->context->send(new AMQFrame(channel, response)); + } +} + +void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t ticket, string& queueName, + string& exchangeName, string& routingKey, bool nowait, + FieldTable& arguments){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + Exchange* exchange = parent->exchanges->get(exchangeName); + if(exchange){ + if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName(); + exchange->bind(queue, routingKey, &arguments); + if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody())); + }else{ + throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); + } +} + +void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ticket, string& queueName, bool nowait){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + int count = queue->purge(); + if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count))); +} + +void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ + ChannelException error(0, ""); + int count(0); + Queue::shared_ptr q = parent->getQueue(queue, channel); + if(ifEmpty && q->getMessageCount() > 0){ + throw ChannelException(406, "Queue not empty."); + }else if(ifUnused && q->getConsumerCount() > 0){ + throw ChannelException(406, "Queue in use."); + }else{ + //remove the queue from the list of exclusive queues if necessary + if(q->isExclusiveOwner(parent)){ + queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); + if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); + } + count = q->getMessageCount(); + parent->queues->destroy(queue); + } + if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count))); +} + + + + +void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){ + //TODO: handle global + //TODO: channel doesn't do anything with these qos parameters yet + parent->channels[channel]->setPrefetchSize(prefetchSize); + parent->channels[channel]->setPrefetchCount(prefetchCount); + parent->context->send(new AMQFrame(channel, new BasicQosOkBody())); +} + +void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket, + string& queueName, string& consumerTag, + bool noLocal, bool noAck, bool exclusive, + bool nowait){ + + //TODO: implement nolocal + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + Channel* channel = parent->channels[channelId]; + if(!consumerTag.empty() && channel->exists(consumerTag)){ + throw ConnectionException(530, "Consumer tags must be unique"); + } + + try{ + channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0); + if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); + + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); + }catch(ExclusiveAccessException& e){ + if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); + else throw ChannelException(403, "Access would violate previously granted exclusivity"); + } + +} + +void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ + parent->channels[channel]->cancel(consumerTag); + if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag))); +} + +void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket, + string& exchange, string& routingKey, + bool mandatory, bool immediate){ + + Message* msg = new Message(parent, exchange.length() ? exchange : "amq.direct", routingKey, mandatory, immediate); + parent->channels[channel]->handlePublish(msg); +} + +void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} + +void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){} + +void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){} + +void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){} + diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp new file mode 100644 index 0000000000..e0248958f9 --- /dev/null +++ b/cpp/broker/src/TopicExchange.cpp @@ -0,0 +1,62 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "TopicExchange.h" +#include "ExchangeBinding.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +TopicExchange::TopicExchange(const string& _name) : name(_name) { + +} + +void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + lock.acquire(); + bindings[routingKey].push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); + lock.release(); +} + +void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + lock.acquire(); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + + std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + if(i < queues.end()){ + queues.erase(i); + if(queues.empty()){ + bindings.erase(routingKey); + } + } + lock.release(); +} + +void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ + lock.acquire(); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++){ + (*i)->deliver(msg); + } + lock.release(); +} + +TopicExchange::~TopicExchange(){ + +} + +const std::string TopicExchange::typeName("topic"); |