summaryrefslogtreecommitdiff
path: root/cpp/broker/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/broker/src
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src')
-rw-r--r--cpp/broker/src/AutoDelete.cpp93
-rw-r--r--cpp/broker/src/Broker.cpp92
-rw-r--r--cpp/broker/src/Channel.cpp148
-rw-r--r--cpp/broker/src/Configuration.cpp195
-rw-r--r--cpp/broker/src/DirectExchange.cpp72
-rw-r--r--cpp/broker/src/ExchangeBinding.cpp32
-rw-r--r--cpp/broker/src/ExchangeRegistry.cpp43
-rw-r--r--cpp/broker/src/FanOutExchange.cpp56
-rw-r--r--cpp/broker/src/Message.cpp97
-rw-r--r--cpp/broker/src/NameGenerator.cpp29
-rw-r--r--cpp/broker/src/Queue.cpp148
-rw-r--r--cpp/broker/src/QueueRegistry.cpp72
-rw-r--r--cpp/broker/src/SessionHandlerFactoryImpl.cpp40
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp378
-rw-r--r--cpp/broker/src/TopicExchange.cpp62
15 files changed, 1557 insertions, 0 deletions
diff --git a/cpp/broker/src/AutoDelete.cpp b/cpp/broker/src/AutoDelete.cpp
new file mode 100644
index 0000000000..6793ec449d
--- /dev/null
+++ b/cpp/broker/src/AutoDelete.cpp
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "AutoDelete.h"
+
+using namespace qpid::broker;
+
+AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry),
+ period(_period),
+ stopped(true),
+ runner(0){}
+
+void AutoDelete::add(Queue::shared_ptr const queue){
+ lock.acquire();
+ queues.push(queue);
+ lock.release();
+}
+
+Queue::shared_ptr const AutoDelete::pop(){
+ Queue::shared_ptr next;
+ lock.acquire();
+ if(!queues.empty()){
+ next = queues.front();
+ queues.pop();
+ }
+ lock.release();
+ return next;
+}
+
+void AutoDelete::process(){
+ Queue::shared_ptr seen;
+ for(Queue::shared_ptr q = pop(); q; q = pop()){
+ if(seen == q){
+ add(q);
+ break;
+ }else if(q->canAutoDelete()){
+ std::string name(q->getName());
+ registry->destroy(name);
+ std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
+ }else{
+ add(q);
+ if(!seen) seen = q;
+ }
+ }
+}
+
+void AutoDelete::run(){
+ monitor.acquire();
+ while(!stopped){
+ process();
+ monitor.wait(period);
+ }
+ monitor.release();
+}
+
+void AutoDelete::start(){
+ monitor.acquire();
+ if(stopped){
+ runner = factory.create(this);
+ stopped = false;
+ monitor.release();
+ runner->start();
+ }else{
+ monitor.release();
+ }
+}
+
+void AutoDelete::stop(){
+ monitor.acquire();
+ if(!stopped){
+ stopped = true;
+ monitor.notify();
+ monitor.release();
+ runner->join();
+ delete runner;
+ }else{
+ monitor.release();
+ }
+}
diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp
new file mode 100644
index 0000000000..5d59b63622
--- /dev/null
+++ b/cpp/broker/src/Broker.cpp
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <memory>
+#include "apr_signal.h"
+
+#include "Acceptor.h"
+#include "Configuration.h"
+#include "QpidError.h"
+#include "SessionHandlerFactoryImpl.h"
+
+//optional includes:
+#ifdef _USE_APR_IO_
+
+#include "BlockingAPRAcceptor.h"
+#include "LFAcceptor.h"
+
+#endif
+
+using namespace qpid::broker;
+using namespace qpid::io;
+
+void handle_signal(int signal);
+
+Acceptor* createAcceptor(Configuration& config);
+
+int main(int argc, char** argv)
+{
+ SessionHandlerFactoryImpl factory;
+ Configuration config;
+ try{
+
+ config.parse(argc, argv);
+ if(config.isHelp()){
+ config.usage();
+ }else{
+#ifdef _USE_APR_IO_
+ apr_signal(SIGINT, handle_signal);
+#endif
+ try{
+ std::auto_ptr<Acceptor> acceptor(createAcceptor(config));
+ try{
+ acceptor->bind(config.getPort(), &factory);
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+ }
+ }catch(Configuration::ParseException error){
+ std::cout << "Error: " << error.error << std::endl;
+ }
+
+ return 1;
+}
+
+Acceptor* createAcceptor(Configuration& config){
+ const string type(config.getAcceptor());
+#ifdef _USE_APR_IO_
+ if("blocking" == type){
+ std::cout << "Using blocking acceptor " << std::endl;
+ return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
+ }else if("non-blocking" == type){
+ std::cout << "Using non-blocking acceptor " << std::endl;
+ return new LFAcceptor(config.isTrace(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads(),
+ config.getMaxConnections());
+ }
+#endif
+ throw Configuration::ParseException("Unrecognised acceptor: " + type);
+}
+
+void handle_signal(int signal){
+ std::cout << "Shutting down..." << std::endl;
+}
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
new file mode 100644
index 0000000000..6980fe5a1b
--- /dev/null
+++ b/cpp/broker/src/Channel.cpp
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Channel.h"
+#include "QpidError.h"
+#include <iostream>
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out),
+ id(_id),
+ framesize(_framesize),
+ transactional(false),
+ deliveryTag(1),
+ tagGenerator("sgen"){}
+
+Channel::~Channel(){
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+ std::cout << "ERROR: Channel consumer appears not to have been cancelled before channel was destroyed." << std::endl;
+ delete (i->second);
+ }
+}
+
+bool Channel::exists(string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
+ if(tag.empty()) tag = tagGenerator.generate();
+
+ ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection));
+ try{
+ queue->consume(c, exclusive);//may throw exception
+ consumers[tag] = c;
+ }catch(ExclusiveAccessException& e){
+ delete c;
+ throw e;
+ }
+}
+
+void Channel::cancel(string& tag){
+ ConsumerImpl* c = consumers[tag];
+ if(c){
+ c->cancel();
+ consumers.erase(tag);
+ delete c;
+ }
+}
+
+void Channel::close(){
+ //cancel all consumers
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+ ConsumerImpl* c = i->second;
+ c->cancel();
+ consumers.erase(i);
+ delete c;
+ }
+}
+
+void Channel::begin(){
+ transactional = true;
+}
+
+void Channel::commit(){
+
+}
+
+void Channel::rollback(){
+
+}
+
+void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){
+ //send deliver method, header and content(s)
+ msg->deliver(out, id, consumerTag, deliveryTag++, framesize);
+}
+
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection) : parent(_parent),
+ tag(_tag),
+ queue(_queue),
+ connection(_connection){
+}
+
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+ if(connection != msg->getPublisher()){
+ parent->deliver(msg, tag);
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Channel::ConsumerImpl::cancel(){
+ if(queue) queue->cancel(this);
+}
+
+void Channel::handlePublish(Message* msg){
+ if(message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+ }
+ message = Message::shared_ptr(msg);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
+ }
+ message->setHeader(header);
+ if(message->isComplete()){
+ publish(exchanges);
+ }
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
+ }
+ message->addContent(content);
+ if(message->isComplete()){
+ publish(exchanges);
+ }
+}
+
+void Channel::publish(ExchangeRegistry* exchanges){
+ if(!route(message, exchanges)){
+ std::cout << "WARNING: Could not route message." << std::endl;
+ }
+ message.reset();
+}
diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp
new file mode 100644
index 0000000000..aceb35bc87
--- /dev/null
+++ b/cpp/broker/src/Configuration.cpp
@@ -0,0 +1,195 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Configuration.h"
+
+using namespace qpid::broker;
+using namespace std;
+
+Configuration::Configuration() :
+ trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
+ port('p', "port", "Sets the port to listen on (default=5672)", 5672),
+ workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
+ maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
+ connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
+ acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
+ help("help", "Prints usage information", false)
+{
+ options.push_back(&trace);
+ options.push_back(&port);
+ options.push_back(&workerThreads);
+ options.push_back(&maxConnections);
+ options.push_back(&connectionBacklog);
+ options.push_back(&acceptor);
+ options.push_back(&help);
+}
+
+Configuration::~Configuration(){}
+
+void Configuration::parse(int argc, char** argv){
+ int position = 1;
+ while(position < argc){
+ bool matched(false);
+ for(op_iterator i = options.begin(); i < options.end() && !matched; i++){
+ matched = (*i)->parse(position, argv, argc);
+ }
+ if(!matched){
+ std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl;
+ position++;
+ }
+ }
+}
+
+void Configuration::usage(){
+ for(op_iterator i = options.begin(); i < options.end(); i++){
+ (*i)->print(std::cout);
+ }
+}
+
+bool Configuration::isHelp(){
+ return help.getValue();
+}
+
+bool Configuration::isTrace(){
+ return trace.getValue();
+}
+
+int Configuration::getPort(){
+ return port.getValue();
+}
+
+int Configuration::getWorkerThreads(){
+ return workerThreads.getValue();
+}
+
+int Configuration::getMaxConnections(){
+ return maxConnections.getValue();
+}
+
+int Configuration::getConnectionBacklog(){
+ return connectionBacklog.getValue();
+}
+
+const string& Configuration::getAcceptor(){
+ return acceptor.getValue();
+}
+
+Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
+ flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
+
+Configuration::Option::Option(const string& _name, const string& _desc) :
+ flag(""), name("--" + _name), desc(_desc) {}
+
+Configuration::Option::~Option(){}
+
+bool Configuration::Option::match(const string& arg){
+ return flag == arg || name == arg;
+}
+
+bool Configuration::Option::parse(int& i, char** argv, int argc){
+ const string arg(argv[i]);
+ if(match(arg)){
+ if(needsValue()){
+ if(++i < argc) setValue(argv[i]);
+ else throw ParseException("Argument " + arg + " requires a value!");
+ }else{
+ setValue("");
+ }
+ i++;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Configuration::Option::print(ostream& out) const {
+ out << " ";
+ if(flag.length() > 0){
+ out << flag << " or ";
+ }
+ out << name;
+ if(needsValue()) out << "<value>";
+ out << std::endl;
+ out << " " << desc << std::endl;
+}
+
+
+// String Option:
+
+Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::~StringOption(){}
+
+const string& Configuration::StringOption::getValue() const {
+ return value;
+}
+
+bool Configuration::StringOption::needsValue() const {
+ return true;
+}
+
+void Configuration::StringOption::setValue(const std::string& _value){
+ value = _value;
+}
+
+// Int Option:
+
+Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::~IntOption(){}
+
+int Configuration::IntOption::getValue() const {
+ return value;
+}
+
+bool Configuration::IntOption::needsValue() const {
+ return true;
+}
+
+void Configuration::IntOption::setValue(const std::string& _value){
+ value = atoi(_value.c_str());
+}
+
+// Bool Option:
+
+Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::~BoolOption(){}
+
+bool Configuration::BoolOption::getValue() const {
+ return value;
+}
+
+bool Configuration::BoolOption::needsValue() const {
+ return false;
+}
+
+void Configuration::BoolOption::setValue(const std::string& _value){
+ value = true;
+}
diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp
new file mode 100644
index 0000000000..70f7ee838f
--- /dev/null
+++ b/cpp/broker/src/DirectExchange.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "DirectExchange.h"
+#include "ExchangeBinding.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+DirectExchange::DirectExchange(const string& _name) : name(_name) {
+
+}
+
+void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i == queues.end()){
+ bindings[routingKey].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+ lock.release();
+}
+
+void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i < queues.end()){
+ queues.erase(i);
+ if(queues.empty()){
+ bindings.erase(routingKey);
+ }
+ }
+ lock.release();
+}
+
+void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ int count(0);
+ for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
+ (*i)->deliver(msg);
+ }
+ if(!count){
+ std::cout << "WARNING: DirectExchange " << name << " could not route message with key " << routingKey << std::endl;
+ }
+ lock.release();
+}
+
+DirectExchange::~DirectExchange(){
+
+}
+
+
+const std::string DirectExchange::typeName("direct");
diff --git a/cpp/broker/src/ExchangeBinding.cpp b/cpp/broker/src/ExchangeBinding.cpp
new file mode 100644
index 0000000000..6160a67fd3
--- /dev/null
+++ b/cpp/broker/src/ExchangeBinding.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "ExchangeBinding.h"
+#include "Exchange.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
+
+void ExchangeBinding::cancel(){
+ e->unbind(q, key, args);
+ delete this;
+}
+
+ExchangeBinding::~ExchangeBinding(){
+}
diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp
new file mode 100644
index 0000000000..0ee581af2f
--- /dev/null
+++ b/cpp/broker/src/ExchangeRegistry.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "ExchangeRegistry.h"
+#include "MonitorImpl.h"
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
+
+ExchangeRegistry::~ExchangeRegistry(){
+ delete lock;
+}
+
+void ExchangeRegistry::declare(Exchange* exchange){
+ exchanges[exchange->getName()] = exchange;
+}
+
+void ExchangeRegistry::destroy(const string& name){
+ if(exchanges[name]){
+ delete exchanges[name];
+ exchanges.erase(name);
+ }
+}
+
+Exchange* ExchangeRegistry::get(const string& name){
+ return exchanges[name];
+}
diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp
new file mode 100644
index 0000000000..7f261d5eda
--- /dev/null
+++ b/cpp/broker/src/FanOutExchange.cpp
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "FanOutExchange.h"
+#include "ExchangeBinding.h"
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+FanOutExchange::FanOutExchange(const string& _name) : name(_name) {}
+
+void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ Locker locker(lock);
+ // Add if not already present.
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i == bindings.end()) {
+ bindings.push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+}
+
+void FanOutExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ Locker locker(lock);
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i != bindings.end()) {
+ bindings.erase(i);
+ // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match?
+ }
+}
+
+void FanOutExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+ Locker locker(lock);
+ for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
+ (*i)->deliver(msg);
+ }
+}
+
+FanOutExchange::~FanOutExchange() {}
+
+const std::string FanOutExchange::typeName("fanout");
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
new file mode 100644
index 0000000000..7afcd97934
--- /dev/null
+++ b/cpp/broker/src/Message.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "MonitorImpl.h"
+#include "Message.h"
+#include "ExchangeRegistry.h"
+#include <iostream>
+
+using namespace std::tr1;//for *_pointer_cast methods
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+
+Message::Message(const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate) : publisher(_publisher),
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate){
+
+}
+
+Message::~Message(){
+}
+
+void Message::setHeader(AMQHeaderBody::shared_ptr header){
+ this->header = header;
+}
+
+void Message::addContent(AMQContentBody::shared_ptr data){
+ content.push_back(data);
+}
+
+bool Message::isComplete(){
+ return header.get() && (header->getContentSize() == contentSize());
+}
+
+void Message::deliver(OutputHandler* out, int channel,
+ string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, false, exchange, routingKey)));
+ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+ out->send(new AMQFrame(channel, headerBody));
+ for(content_iterator i = content.begin(); i != content.end(); i++){
+ if((*i)->size() > framesize){
+ //TODO: need to split it
+ std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
+ }else{
+ AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ out->send(new AMQFrame(channel, contentBody));
+ }
+ }
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+ return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+u_int64_t Message::contentSize(){
+ u_int64_t size(0);
+ for(content_iterator i = content.begin(); i != content.end(); i++){
+ size += (*i)->size();
+ }
+ return size;
+}
+
+const ConnectionToken* const Message::getPublisher(){
+ return publisher;
+}
+
+bool qpid::broker::route(Message::shared_ptr& msg, ExchangeRegistry* registry){
+ Exchange* exchange = registry->get(msg->exchange);
+ if(exchange){
+ exchange->route(msg, msg->routingKey, &(msg->getHeaderProperties()->getHeaders()));
+ return true;
+ }else{
+ return false;
+ }
+}
+
diff --git a/cpp/broker/src/NameGenerator.cpp b/cpp/broker/src/NameGenerator.cpp
new file mode 100644
index 0000000000..46aa385a7e
--- /dev/null
+++ b/cpp/broker/src/NameGenerator.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "NameGenerator.h"
+#include <sstream>
+
+using namespace qpid::broker;
+
+NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
+
+std::string NameGenerator::generate(){
+ std::stringstream ss;
+ ss << base << counter++;
+ return ss.str();
+}
diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp
new file mode 100644
index 0000000000..f7b8605b03
--- /dev/null
+++ b/cpp/broker/src/Queue.cpp
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Queue.h"
+#include "MonitorImpl.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) : name(_name),
+ durable(_durable),
+ autodelete(_autodelete),
+ owner(_owner),
+ queueing(false),
+ dispatching(false),
+ next(0),
+ lastUsed(0),
+ exclusive(0){
+
+ if(autodelete) lastUsed = apr_time_as_msec(apr_time_now());
+}
+
+Queue::~Queue(){
+ for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
+ b->cancel();
+ bindings.pop();
+ }
+}
+
+void Queue::bound(Binding* b){
+ bindings.push(b);
+}
+
+void Queue::deliver(Message::shared_ptr& msg){
+ Locker locker(lock);
+ if(queueing || !dispatch(msg)){
+ queueing = true;
+ messages.push(msg);
+ }
+}
+
+bool Queue::dispatch(Message::shared_ptr& msg){
+ if(consumers.empty()){
+ return false;
+ }else if(exclusive){
+ if(!exclusive->deliver(msg)){
+ std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
+ }
+ return true;
+ }else{
+ //deliver to next consumer
+ next = next % consumers.size();
+ Consumer* c = consumers[next];
+ int start = next;
+ while(c){
+ next++;
+ if(c->deliver(msg)) return true;
+
+ next = next % consumers.size();
+ c = next == start ? 0 : consumers[next];
+ }
+ return false;
+ }
+}
+
+bool Queue::startDispatching(){
+ Locker locker(lock);
+ if(queueing && !dispatching){
+ dispatching = true;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Queue::dispatch(){
+ bool proceed = startDispatching();
+ while(proceed){
+ Locker locker(lock);
+ if(!messages.empty() && dispatch(messages.front())){
+ messages.pop();
+ }else{
+ dispatching = false;
+ proceed = false;
+ queueing = !messages.empty();
+ }
+ }
+}
+
+void Queue::consume(Consumer* c, bool requestExclusive){
+ Locker locker(lock);
+ if(exclusive) throw ExclusiveAccessException();
+ if(requestExclusive){
+ if(!consumers.empty()) throw ExclusiveAccessException();
+ exclusive = c;
+ }
+
+ if(autodelete && consumers.empty()) lastUsed = 0;
+ consumers.push_back(c);
+}
+
+void Queue::cancel(Consumer* c){
+ Locker locker(lock);
+ consumers.erase(find(consumers.begin(), consumers.end(), c));
+ if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now());
+ if(exclusive == c) exclusive = 0;
+}
+
+Message::shared_ptr Queue::dequeue(){
+
+}
+
+u_int32_t Queue::purge(){
+ Locker locker(lock);
+ int count = messages.size();
+ while(!messages.empty()) messages.pop();
+ return count;
+}
+
+u_int32_t Queue::getMessageCount() const{
+ Locker locker(lock);
+ return messages.size();
+}
+
+u_int32_t Queue::getConsumerCount() const{
+ Locker locker(lock);
+ return consumers.size();
+}
+
+bool Queue::canAutoDelete() const{
+ Locker locker(lock);
+ return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
+}
diff --git a/cpp/broker/src/QueueRegistry.cpp b/cpp/broker/src/QueueRegistry.cpp
new file mode 100644
index 0000000000..f807415314
--- /dev/null
+++ b/cpp/broker/src/QueueRegistry.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "QueueRegistry.h"
+#include "MonitorImpl.h"
+#include "SessionHandlerImpl.h"
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+QueueRegistry::QueueRegistry() : counter(1){}
+
+QueueRegistry::~QueueRegistry(){}
+
+std::pair<Queue::shared_ptr, bool>
+QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
+{
+ Locker locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
+ queues[name] = queue;
+ return std::pair<Queue::shared_ptr, bool>(queue, true);
+ } else {
+ return std::pair<Queue::shared_ptr, bool>(i->second, false);
+ }
+}
+
+void QueueRegistry::destroy(const string& name){
+ Locker locker(lock);
+ queues.erase(name);
+}
+
+Queue::shared_ptr QueueRegistry::find(const string& name){
+ Locker locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ return Queue::shared_ptr();
+ } else {
+ return i->second;
+ }
+}
+
+string QueueRegistry::generateName(){
+ string name;
+ do {
+ std::stringstream ss;
+ ss << "tmp_" << counter++;
+ name = ss.str();
+ // Thread safety: Private function, only called with lock held
+ // so this is OK.
+ } while(queues.find(name) != queues.end());
+ return name;
+}
diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
new file mode 100644
index 0000000000..661cb4ef81
--- /dev/null
+++ b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "SessionHandlerFactoryImpl.h"
+#include "SessionHandlerImpl.h"
+#include "FanOutExchange.h"
+
+using namespace qpid::broker;
+using namespace qpid::io;
+
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
+ exchanges.declare(new DirectExchange("amq.direct"));
+ exchanges.declare(new TopicExchange("amq.topic"));
+ exchanges.declare(new FanOutExchange("amq.fanout"));
+ cleaner.start();
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
+ return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
+}
+
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
+ cleaner.stop();
+ exchanges.destroy("amq.direct");
+ exchanges.destroy("amq.topic");
+}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
new file mode 100644
index 0000000000..19e243a01b
--- /dev/null
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -0,0 +1,378 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "SessionHandlerImpl.h"
+#include "FanOutExchange.h"
+#include "assert.h"
+
+using namespace std::tr1;
+using namespace qpid::broker;
+using namespace qpid::io;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
+ QueueRegistry* _queues,
+ ExchangeRegistry* _exchanges,
+ AutoDelete* _cleaner,
+ const u_int32_t _timeout) : context(_context),
+ queues(_queues),
+ exchanges(_exchanges),
+ cleaner(_cleaner),
+ timeout(_timeout),
+ channelHandler(new ChannelHandlerImpl(this)),
+ connectionHandler(new ConnectionHandlerImpl(this)),
+ basicHandler(new BasicHandlerImpl(this)),
+ exchangeHandler(new ExchangeHandlerImpl(this)),
+ queueHandler(new QueueHandlerImpl(this)),
+ framemax(65536),
+ heartbeat(0){
+
+}
+
+SessionHandlerImpl::~SessionHandlerImpl(){
+ // TODO aconway 2006-09-07: Should be auto_ptr or plain members.
+ delete channelHandler;
+ delete connectionHandler;
+ delete basicHandler;
+ delete exchangeHandler;
+ delete queueHandler;
+}
+
+Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = channels[channel]->getDefaultQueue();
+ if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+ } else {
+ queue = queues->find(name);
+ if (queue == 0) {
+ throw ChannelException( 404, "Queue not found: " + name);
+ }
+ }
+ return queue;
+}
+
+
+Exchange* SessionHandlerImpl::findExchange(const string& name){
+ exchanges->getLock()->acquire();
+ Exchange* exchange(exchanges->get(name));
+ exchanges->getLock()->release();
+ return exchange;
+}
+
+void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
+ u_int16_t channel = frame->getChannel();
+ AMQBody::shared_ptr body = frame->getBody();
+ AMQMethodBody::shared_ptr method;
+
+ switch(body->type())
+ {
+ case METHOD_BODY:
+ method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+ }catch(ConnectionException& e){
+ context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+ }
+ break;
+
+ case HEADER_BODY:
+ this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ break;
+
+ case CONTENT_BODY:
+ this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ break;
+
+ case HEARTBEAT_BODY:
+ //channel must be 0
+ this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ break;
+ }
+}
+
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
+ //send connection start
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US");
+ context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales)));
+}
+
+void SessionHandlerImpl::idleOut(){
+
+}
+
+void SessionHandlerImpl::idleIn(){
+
+}
+
+void SessionHandlerImpl::closed(){
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+}
+
+void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+ channels[channel]->handleHeader(body, exchanges);
+}
+
+void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+ channels[channel]->handleContent(body, exchanges);
+}
+
+void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
+ std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism,
+ string& response, string& locale){
+
+ parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat)));
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int16_t channelmax, u_int32_t framemax, u_int16_t heartbeat){
+ parent->framemax = framemax;
+ parent->heartbeat = heartbeat;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){
+ string knownhosts;
+ parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts)));
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText,
+ u_int16_t classId, u_int16_t methodId){
+
+ parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody()));
+ parent->context->close();
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){
+ parent->context->close();
+}
+
+
+
+void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){
+ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
+ parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody()));
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){}
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t channel, bool active){}
+
+void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText,
+ u_int16_t classId, u_int16_t methodId){
+ Channel* c = parent->channels[channel];
+ parent->channels.erase(channel);
+ c->close();
+ delete c;
+ parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody()));
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){}
+
+
+
+void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type,
+ bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
+ FieldTable& arguments){
+
+ if(!passive && (
+ type != TopicExchange::typeName &&
+ type != DirectExchange::typeName &&
+ type != FanOutExchange::typeName)
+ )
+ {
+ throw ChannelException(540, "Exchange type not implemented: " + type);
+ }
+
+ parent->exchanges->getLock()->acquire();
+ if(!parent->exchanges->get(exchange)){
+ if(type == TopicExchange::typeName){
+ parent->exchanges->declare(new TopicExchange(exchange));
+ }else if(type == DirectExchange::typeName){
+ parent->exchanges->declare(new DirectExchange(exchange));
+ }else if(type == FanOutExchange::typeName){
+ parent->exchanges->declare(new DirectExchange(exchange));
+ }
+ }
+ parent->exchanges->getLock()->release();
+ if(!nowait){
+ parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody()));
+ }
+}
+
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait){
+ //TODO: implement unused
+ parent->exchanges->getLock()->acquire();
+ parent->exchanges->destroy(exchange);
+ parent->exchanges->getLock()->release();
+ if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody()));
+}
+
+
+
+
+void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, FieldTable& arguments){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = parent->getQueue(name, channel);
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ parent->channels[channel]->setDefaultQueue(queue);
+ //add default binding:
+ parent->exchanges->get("amq.direct")->bind(queue, name, 0);
+ if(exclusive){
+ parent->exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ parent->cleaner->add(queue);
+ }
+ }
+ }
+ if(exclusive && !queue->isExclusiveOwner(parent)){
+ throw ChannelException(405, "Cannot grant exclusive access to queue");
+ }
+ if(!nowait){
+ name = queue->getName();
+ QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount());
+ parent->context->send(new AMQFrame(channel, response));
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t ticket, string& queueName,
+ string& exchangeName, string& routingKey, bool nowait,
+ FieldTable& arguments){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ Exchange* exchange = parent->exchanges->get(exchangeName);
+ if(exchange){
+ if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName();
+ exchange->bind(queue, routingKey, &arguments);
+ if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody()));
+ }else{
+ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ticket, string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ int count = queue->purge();
+ if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count)));
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = parent->getQueue(queue, channel);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(parent)){
+ queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
+ if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ parent->queues->destroy(queue);
+ }
+ if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count)));
+}
+
+
+
+
+void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){
+ //TODO: handle global
+ //TODO: channel doesn't do anything with these qos parameters yet
+ parent->channels[channel]->setPrefetchSize(prefetchSize);
+ parent->channels[channel]->setPrefetchCount(prefetchCount);
+ parent->context->send(new AMQFrame(channel, new BasicQosOkBody()));
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket,
+ string& queueName, string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait){
+
+ //TODO: implement nolocal
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ Channel* channel = parent->channels[channelId];
+ if(!consumerTag.empty() && channel->exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
+ if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
+
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
+ parent->channels[channel]->cancel(consumerTag);
+ if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag)));
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket,
+ string& exchange, string& routingKey,
+ bool mandatory, bool immediate){
+
+ Message* msg = new Message(parent, exchange.length() ? exchange : "amq.direct", routingKey, mandatory, immediate);
+ parent->channels[channel]->handlePublish(msg);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){}
+
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){}
+
+void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){}
+
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){}
+
diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp
new file mode 100644
index 0000000000..e0248958f9
--- /dev/null
+++ b/cpp/broker/src/TopicExchange.cpp
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "TopicExchange.h"
+#include "ExchangeBinding.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+TopicExchange::TopicExchange(const string& _name) : name(_name) {
+
+}
+
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ bindings[routingKey].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ lock.release();
+}
+
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i < queues.end()){
+ queues.erase(i);
+ if(queues.empty()){
+ bindings.erase(routingKey);
+ }
+ }
+ lock.release();
+}
+
+void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++){
+ (*i)->deliver(msg);
+ }
+ lock.release();
+}
+
+TopicExchange::~TopicExchange(){
+
+}
+
+const std::string TopicExchange::typeName("topic");