summaryrefslogtreecommitdiff
path: root/cpp/broker/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
committerAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
commit8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch)
tree1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/broker/src
parent9a808fb13aba243d41bbdab75158dae5939a80a4 (diff)
downloadqpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src')
-rw-r--r--cpp/broker/src/AutoDelete.cpp93
-rw-r--r--cpp/broker/src/Broker.cpp84
-rw-r--r--cpp/broker/src/Channel.cpp256
-rw-r--r--cpp/broker/src/Configuration.cpp196
-rw-r--r--cpp/broker/src/DirectExchange.cpp72
-rw-r--r--cpp/broker/src/ExchangeBinding.cpp32
-rw-r--r--cpp/broker/src/ExchangeRegistry.cpp57
-rw-r--r--cpp/broker/src/FanOutExchange.cpp56
-rw-r--r--cpp/broker/src/HeadersExchange.cpp120
-rw-r--r--cpp/broker/src/Message.cpp100
-rw-r--r--cpp/broker/src/NameGenerator.cpp29
-rw-r--r--cpp/broker/src/Queue.cpp155
-rw-r--r--cpp/broker/src/QueueRegistry.cpp72
-rw-r--r--cpp/broker/src/Router.cpp32
-rw-r--r--cpp/broker/src/SessionHandlerFactoryImpl.cpp50
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp405
-rw-r--r--cpp/broker/src/TopicExchange.cpp163
17 files changed, 0 insertions, 1972 deletions
diff --git a/cpp/broker/src/AutoDelete.cpp b/cpp/broker/src/AutoDelete.cpp
deleted file mode 100644
index 6793ec449d..0000000000
--- a/cpp/broker/src/AutoDelete.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "AutoDelete.h"
-
-using namespace qpid::broker;
-
-AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry),
- period(_period),
- stopped(true),
- runner(0){}
-
-void AutoDelete::add(Queue::shared_ptr const queue){
- lock.acquire();
- queues.push(queue);
- lock.release();
-}
-
-Queue::shared_ptr const AutoDelete::pop(){
- Queue::shared_ptr next;
- lock.acquire();
- if(!queues.empty()){
- next = queues.front();
- queues.pop();
- }
- lock.release();
- return next;
-}
-
-void AutoDelete::process(){
- Queue::shared_ptr seen;
- for(Queue::shared_ptr q = pop(); q; q = pop()){
- if(seen == q){
- add(q);
- break;
- }else if(q->canAutoDelete()){
- std::string name(q->getName());
- registry->destroy(name);
- std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
- }else{
- add(q);
- if(!seen) seen = q;
- }
- }
-}
-
-void AutoDelete::run(){
- monitor.acquire();
- while(!stopped){
- process();
- monitor.wait(period);
- }
- monitor.release();
-}
-
-void AutoDelete::start(){
- monitor.acquire();
- if(stopped){
- runner = factory.create(this);
- stopped = false;
- monitor.release();
- runner->start();
- }else{
- monitor.release();
- }
-}
-
-void AutoDelete::stop(){
- monitor.acquire();
- if(!stopped){
- stopped = true;
- monitor.notify();
- monitor.release();
- runner->join();
- delete runner;
- }else{
- monitor.release();
- }
-}
diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp
deleted file mode 100644
index b6472d1729..0000000000
--- a/cpp/broker/src/Broker.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <memory>
-#include "Broker.h"
-#include "Acceptor.h"
-#include "Configuration.h"
-#include "QpidError.h"
-#include "SessionHandlerFactoryImpl.h"
-#include "BlockingAPRAcceptor.h"
-#include "LFAcceptor.h"
-
-
-using namespace qpid::broker;
-using namespace qpid::io;
-
-namespace {
- Acceptor* createAcceptor(const Configuration& config){
- const string type(config.getAcceptor());
- if("blocking" == type){
- std::cout << "Using blocking acceptor " << std::endl;
- return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
- }else if("non-blocking" == type){
- std::cout << "Using non-blocking acceptor " << std::endl;
- return new LFAcceptor(config.isTrace(),
- config.getConnectionBacklog(),
- config.getWorkerThreads(),
- config.getMaxConnections());
- }
- throw Configuration::ParseException("Unrecognised acceptor: " + type);
- }
-}
-
-Broker::Broker(const Configuration& config) :
- acceptor(createAcceptor(config)),
- port(config.getPort()),
- isBound(false) {}
-
-Broker::shared_ptr Broker::create(int port)
-{
- Configuration config;
- config.setPort(port);
- return create(config);
-}
-
-Broker::shared_ptr Broker::create(const Configuration& config) {
- return Broker::shared_ptr(new Broker(config));
-}
-
-int16_t Broker::bind()
-{
- if (!isBound) {
- port = acceptor->bind(port);
- }
- return port;
-}
-
-void Broker::run() {
- bind();
- acceptor->run(&factory);
-}
-
-void Broker::shutdown() {
- acceptor->shutdown();
-}
-
-Broker::~Broker() { }
-
-const int16_t Broker::DEFAULT_PORT(5672);
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
deleted file mode 100644
index 34d69716c4..0000000000
--- a/cpp/broker/src/Channel.cpp
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Channel.h"
-#include "QpidError.h"
-#include <iostream>
-#include <sstream>
-#include <assert.h>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-
-Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
- id(_id),
- out(_out),
- deliveryTag(1),
- transactional(false),
- prefetchSize(0),
- prefetchCount(0),
- outstandingSize(0),
- outstandingCount(0),
- framesize(_framesize),
- tagGenerator("sgen"){}
-
-Channel::~Channel(){
-}
-
-bool Channel::exists(const string& consumerTag){
- return consumers.find(consumerTag) != consumers.end();
-}
-
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
- if(tag.empty()) tag = tagGenerator.generate();
-
- ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
- try{
- queue->consume(c, exclusive);//may throw exception
- consumers[tag] = c;
- }catch(ExclusiveAccessException& e){
- delete c;
- throw e;
- }
-}
-
-void Channel::cancel(consumer_iterator i){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
-}
-
-void Channel::cancel(const string& tag){
- consumer_iterator i = consumers.find(tag);
- if(i != consumers.end()){
- cancel(i);
- }
-}
-
-void Channel::close(){
- //cancel all consumers
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- cancel(i);
- }
-}
-
-void Channel::begin(){
- transactional = true;
-}
-
-void Channel::commit(){
-
-}
-
-void Channel::rollback(){
-
-}
-
-void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
- Locker locker(deliveryLock);
-
- u_int64_t myDeliveryTag = deliveryTag++;
- if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
- outstandingSize += msg->contentSize();
- outstandingCount++;
- }
- //send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
-}
-
-bool Channel::checkPrefetch(Message::shared_ptr& msg){
- Locker locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
- return countOk && sizeOk;
-}
-
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack) : parent(_parent),
- tag(_tag),
- queue(_queue),
- connection(_connection),
- ackExpected(ack),
- blocked(false){
-}
-
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
- if(!connection || connection != msg->getPublisher()){//check for no_local
- if(ackExpected && !parent->checkPrefetch(msg)){
- blocked = true;
- }else{
- blocked = false;
- parent->deliver(msg, tag, queue, ackExpected);
- return true;
- }
- }
- return false;
-}
-
-void Channel::ConsumerImpl::cancel(){
- if(queue) queue->cancel(this);
-}
-
-void Channel::ConsumerImpl::requestDispatch(){
- if(blocked) queue->dispatch();
-}
-
-void Channel::checkMessage(const std::string& text){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
- }
-}
-
-void Channel::handlePublish(Message* msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
- }
- message = Message::shared_ptr(msg);
-}
-
-void Channel::ack(u_int64_t _deliveryTag, bool multiple){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag));
- if(i == unacknowledged.end()){
- throw InvalidAckException();
- }else if(multiple){
- unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute prefetch outstanding (note: messages delivered through get are ignored)
- CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
- outstandingSize = calc.getSize();
- outstandingCount = calc.getCount();
- }else{
- if(!i->pull){
- outstandingSize -= i->msg->contentSize();
- outstandingCount--;
- }
- unacknowledged.erase(i);
- }
-
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
- }
-}
-
-void Channel::recover(bool requeue){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- if(requeue){
- outstandingSize = 0;
- outstandingCount = 0;
- ack_iterator start(unacknowledged.begin());
- ack_iterator end(unacknowledged.end());
- for_each(start, end, Requeue());
- unacknowledged.erase(start, end);
- }else{
- for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
- }
-}
-
-bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
- Message::shared_ptr msg = queue->dequeue();
- if(msg){
- Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
- msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
- if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
- }
- return true;
- }else{
- return false;
- }
-}
-
-Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
-
-bool Channel::MatchAck::operator()(AckRecord& record) const{
- return tag == record.deliveryTag;
-}
-
-void Channel::Requeue::operator()(AckRecord& record) const{
- record.msg->redeliver();
- record.queue->deliver(record.msg);
-}
-
-Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
-
-void Channel::Redeliver::operator()(AckRecord& record) const{
- if(record.pull){
- //if message was originally sent as response to get, we must requeue it
- record.msg->redeliver();
- record.queue->deliver(record.msg);
- }else{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
- }
-}
-
-Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-
-void Channel::CalculatePrefetch::operator()(AckRecord& record){
- if(!record.pull){
- //ignore messages that were sent in response to get when calculating prefetch
- size += record.msg->contentSize();
- count++;
- }
-}
-
-u_int32_t Channel::CalculatePrefetch::getSize(){
- return size;
-}
-
-u_int16_t Channel::CalculatePrefetch::getCount(){
- return count;
-}
diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp
deleted file mode 100644
index 6e7df7889e..0000000000
--- a/cpp/broker/src/Configuration.cpp
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Configuration.h"
-#include <string.h>
-
-using namespace qpid::broker;
-using namespace std;
-
-Configuration::Configuration() :
- trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
- port('p', "port", "Sets the port to listen on (default=5672)", 5672),
- workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
- maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
- connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
- acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
- help("help", "Prints usage information", false)
-{
- options.push_back(&trace);
- options.push_back(&port);
- options.push_back(&workerThreads);
- options.push_back(&maxConnections);
- options.push_back(&connectionBacklog);
- options.push_back(&acceptor);
- options.push_back(&help);
-}
-
-Configuration::~Configuration(){}
-
-void Configuration::parse(int argc, char** argv){
- int position = 1;
- while(position < argc){
- bool matched(false);
- for(op_iterator i = options.begin(); i < options.end() && !matched; i++){
- matched = (*i)->parse(position, argv, argc);
- }
- if(!matched){
- std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl;
- position++;
- }
- }
-}
-
-void Configuration::usage(){
- for(op_iterator i = options.begin(); i < options.end(); i++){
- (*i)->print(std::cout);
- }
-}
-
-bool Configuration::isHelp() const {
- return help.getValue();
-}
-
-bool Configuration::isTrace() const {
- return trace.getValue();
-}
-
-int Configuration::getPort() const {
- return port.getValue();
-}
-
-int Configuration::getWorkerThreads() const {
- return workerThreads.getValue();
-}
-
-int Configuration::getMaxConnections() const {
- return maxConnections.getValue();
-}
-
-int Configuration::getConnectionBacklog() const {
- return connectionBacklog.getValue();
-}
-
-string Configuration::getAcceptor() const {
- return acceptor.getValue();
-}
-
-Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
- flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
-
-Configuration::Option::Option(const string& _name, const string& _desc) :
- flag(""), name("--" + _name), desc(_desc) {}
-
-Configuration::Option::~Option(){}
-
-bool Configuration::Option::match(const string& arg){
- return flag == arg || name == arg;
-}
-
-bool Configuration::Option::parse(int& i, char** argv, int argc){
- const string arg(argv[i]);
- if(match(arg)){
- if(needsValue()){
- if(++i < argc) setValue(argv[i]);
- else throw ParseException("Argument " + arg + " requires a value!");
- }else{
- setValue("");
- }
- i++;
- return true;
- }else{
- return false;
- }
-}
-
-void Configuration::Option::print(ostream& out) const {
- out << " ";
- if(flag.length() > 0){
- out << flag << " or ";
- }
- out << name;
- if(needsValue()) out << "<value>";
- out << std::endl;
- out << " " << desc << std::endl;
-}
-
-
-// String Option:
-
-Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::StringOption::~StringOption(){}
-
-const string& Configuration::StringOption::getValue() const {
- return value;
-}
-
-bool Configuration::StringOption::needsValue() const {
- return true;
-}
-
-void Configuration::StringOption::setValue(const std::string& _value){
- value = _value;
-}
-
-// Int Option:
-
-Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::IntOption::~IntOption(){}
-
-int Configuration::IntOption::getValue() const {
- return value;
-}
-
-bool Configuration::IntOption::needsValue() const {
- return true;
-}
-
-void Configuration::IntOption::setValue(const std::string& _value){
- value = atoi(_value.c_str());
-}
-
-// Bool Option:
-
-Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::BoolOption::~BoolOption(){}
-
-bool Configuration::BoolOption::getValue() const {
- return value;
-}
-
-bool Configuration::BoolOption::needsValue() const {
- return false;
-}
-
-void Configuration::BoolOption::setValue(const std::string& _value){
- value = strcasecmp(_value.c_str(), "true") == 0;
-}
diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp
deleted file mode 100644
index 94cfbc766d..0000000000
--- a/cpp/broker/src/DirectExchange.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "DirectExchange.h"
-#include "ExchangeBinding.h"
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
-
-}
-
-void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- lock.acquire();
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i == queues.end()){
- bindings[routingKey].push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
- }
- lock.release();
-}
-
-void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
-
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i < queues.end()){
- queues.erase(i);
- if(queues.empty()){
- bindings.erase(routingKey);
- }
- }
- lock.release();
-}
-
-void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- int count(0);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- (*i)->deliver(msg);
- }
- if(!count){
- std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
- }
- lock.release();
-}
-
-DirectExchange::~DirectExchange(){
-
-}
-
-
-const std::string DirectExchange::typeName("direct");
diff --git a/cpp/broker/src/ExchangeBinding.cpp b/cpp/broker/src/ExchangeBinding.cpp
deleted file mode 100644
index 6160a67fd3..0000000000
--- a/cpp/broker/src/ExchangeBinding.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "ExchangeBinding.h"
-#include "Exchange.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
-
-void ExchangeBinding::cancel(){
- e->unbind(q, key, args);
- delete this;
-}
-
-ExchangeBinding::~ExchangeBinding(){
-}
diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp
deleted file mode 100644
index 05396382a7..0000000000
--- a/cpp/broker/src/ExchangeRegistry.cpp
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "ExchangeRegistry.h"
-#include "MonitorImpl.h"
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
-
-ExchangeRegistry::~ExchangeRegistry(){
- for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i)
- {
- delete i->second;
- }
- delete lock;
-}
-
-void ExchangeRegistry::declare(Exchange* exchange){
- exchanges[exchange->getName()] = exchange;
-}
-
-void ExchangeRegistry::destroy(const string& name){
- if(exchanges[name]){
- delete exchanges[name];
- exchanges.erase(name);
- }
-}
-
-Exchange* ExchangeRegistry::get(const string& name){
- return exchanges[name];
-}
-
-namespace
-{
-const std::string empty;
-}
-
-Exchange* ExchangeRegistry::getDefault()
-{
- return get(empty);
-}
diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp
deleted file mode 100644
index e8cb8f6315..0000000000
--- a/cpp/broker/src/FanOutExchange.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "FanOutExchange.h"
-#include "ExchangeBinding.h"
-#include <algorithm>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-
-void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- Locker locker(lock);
- // Add if not already present.
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
- if (i == bindings.end()) {
- bindings.push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
- }
-}
-
-void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){
- Locker locker(lock);
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
- if (i != bindings.end()) {
- bindings.erase(i);
- // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match?
- }
-}
-
-void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){
- Locker locker(lock);
- for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- (*i)->deliver(msg);
- }
-}
-
-FanOutExchange::~FanOutExchange() {}
-
-const std::string FanOutExchange::typeName("fanout");
diff --git a/cpp/broker/src/HeadersExchange.cpp b/cpp/broker/src/HeadersExchange.cpp
deleted file mode 100644
index 65204cdb85..0000000000
--- a/cpp/broker/src/HeadersExchange.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "HeadersExchange.h"
-#include "ExchangeBinding.h"
-#include "Value.h"
-#include "QpidError.h"
-#include <algorithm>
-
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-// TODO aconway 2006-09-20: More efficient matching algorithm.
-// The current search algorithm really sucks.
-// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
-
-using namespace qpid::broker;
-
-namespace {
- const std::string all("all");
- const std::string any("any");
- const std::string x_match("x-match");
-}
-
-HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-
-void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- std::cout << "HeadersExchange::bind" << std::endl;
- Locker locker(lock);
- std::string what = args->getString("x-match");
- if (what != all && what != any) {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
- }
- bindings.push_back(Binding(*args, queue));
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
-}
-
-void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){
- Locker locker(lock);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
- if (i != bindings.end()) bindings.erase(i);
-}
-
-
-void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){
- std::cout << "route: " << *args << std::endl;
- Locker locker(lock);;
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) i->second->deliver(msg);
- }
-}
-
-HeadersExchange::~HeadersExchange() {}
-
-const std::string HeadersExchange::typeName("headers");
-
-namespace
-{
-
- bool match_values(const Value& bind, const Value& msg) {
- return dynamic_cast<const EmptyValue*>(&bind) || bind == msg;
- }
-
-}
-
-
-bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
- typedef FieldTable::ValueMap Map;
- std::string what = bind.getString(x_match);
- if (what == all) {
- for (Map::const_iterator i = bind.getMap().begin();
- i != bind.getMap().end();
- ++i)
- {
- if (i->first != x_match)
- {
- Map::const_iterator j = msg.getMap().find(i->first);
- if (j == msg.getMap().end()) return false;
- if (!match_values(*(i->second), *(j->second))) return false;
- }
- }
- return true;
- } else if (what == any) {
- for (Map::const_iterator i = bind.getMap().begin();
- i != bind.getMap().end();
- ++i)
- {
- if (i->first != x_match)
- {
- Map::const_iterator j = msg.getMap().find(i->first);
- if (j != msg.getMap().end()) {
- if (match_values(*(i->second), *(j->second))) return true;
- }
- }
- }
- return false;
- } else {
- return false;
- }
-}
-
-
-
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
deleted file mode 100644
index 0a8a5f7a4d..0000000000
--- a/cpp/broker/src/Message.cpp
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "MonitorImpl.h"
-#include "Message.h"
-#include "ExchangeRegistry.h"
-#include <iostream>
-
-using namespace std::tr1;//for *_pointer_cast methods
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-
-Message::Message(const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) : publisher(_publisher),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- redelivered(false),
- size(0){
-
-}
-
-Message::~Message(){
-}
-
-void Message::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
-}
-
-void Message::addContent(AMQContentBody::shared_ptr data){
- content.push_back(data);
- size += data->size();
-}
-
-bool Message::isComplete(){
- return header.get() && (header->getContentSize() == contentSize());
-}
-
-void Message::redeliver(){
- redelivered = true;
-}
-
-void Message::deliver(OutputHandler* out, int channel,
- const string& consumerTag, u_int64_t deliveryTag,
- u_int32_t framesize){
-
- out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
- sendContent(out, channel, framesize);
-}
-
-void Message::sendGetOk(OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize){
-
- out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
- sendContent(out, channel, framesize);
-}
-
-void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
- AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
- out->send(new AMQFrame(channel, headerBody));
- for(content_iterator i = content.begin(); i != content.end(); i++){
- if((*i)->size() > framesize){
- //TODO: need to split it
- std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
- }else{
- AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- out->send(new AMQFrame(channel, contentBody));
- }
- }
-}
-
-BasicHeaderProperties* Message::getHeaderProperties(){
- return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
-}
-
-const ConnectionToken* const Message::getPublisher(){
- return publisher;
-}
-
diff --git a/cpp/broker/src/NameGenerator.cpp b/cpp/broker/src/NameGenerator.cpp
deleted file mode 100644
index 46aa385a7e..0000000000
--- a/cpp/broker/src/NameGenerator.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "NameGenerator.h"
-#include <sstream>
-
-using namespace qpid::broker;
-
-NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
-
-std::string NameGenerator::generate(){
- std::stringstream ss;
- ss << base << counter++;
- return ss.str();
-}
diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp
deleted file mode 100644
index eaaa3ffa31..0000000000
--- a/cpp/broker/src/Queue.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Queue.h"
-#include "MonitorImpl.h"
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) :
- name(_name),
- autodelete(_autodelete),
- durable(_durable),
- owner(_owner),
- queueing(false),
- dispatching(false),
- next(0),
- lastUsed(0),
- exclusive(0)
-{
- if(autodelete) lastUsed = apr_time_as_msec(apr_time_now());
-}
-
-Queue::~Queue(){
- for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
- b->cancel();
- bindings.pop();
- }
-}
-
-void Queue::bound(Binding* b){
- bindings.push(b);
-}
-
-void Queue::deliver(Message::shared_ptr& msg){
- Locker locker(lock);
- if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push(msg);
- }
-}
-
-bool Queue::dispatch(Message::shared_ptr& msg){
- if(consumers.empty()){
- return false;
- }else if(exclusive){
- if(!exclusive->deliver(msg)){
- std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
- }
- return true;
- }else{
- //deliver to next consumer
- next = next % consumers.size();
- Consumer* c = consumers[next];
- int start = next;
- while(c){
- next++;
- if(c->deliver(msg)) return true;
-
- next = next % consumers.size();
- c = next == start ? 0 : consumers[next];
- }
- return false;
- }
-}
-
-bool Queue::startDispatching(){
- Locker locker(lock);
- if(queueing && !dispatching){
- dispatching = true;
- return true;
- }else{
- return false;
- }
-}
-
-void Queue::dispatch(){
- bool proceed = startDispatching();
- while(proceed){
- Locker locker(lock);
- if(!messages.empty() && dispatch(messages.front())){
- messages.pop();
- }else{
- dispatching = false;
- proceed = false;
- queueing = !messages.empty();
- }
- }
-}
-
-void Queue::consume(Consumer* c, bool requestExclusive){
- Locker locker(lock);
- if(exclusive) throw ExclusiveAccessException();
- if(requestExclusive){
- if(!consumers.empty()) throw ExclusiveAccessException();
- exclusive = c;
- }
-
- if(autodelete && consumers.empty()) lastUsed = 0;
- consumers.push_back(c);
-}
-
-void Queue::cancel(Consumer* c){
- Locker locker(lock);
- consumers.erase(find(consumers.begin(), consumers.end(), c));
- if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now());
- if(exclusive == c) exclusive = 0;
-}
-
-Message::shared_ptr Queue::dequeue(){
- Locker locker(lock);
- Message::shared_ptr msg;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
-u_int32_t Queue::purge(){
- Locker locker(lock);
- int count = messages.size();
- while(!messages.empty()) messages.pop();
- return count;
-}
-
-u_int32_t Queue::getMessageCount() const{
- Locker locker(lock);
- return messages.size();
-}
-
-u_int32_t Queue::getConsumerCount() const{
- Locker locker(lock);
- return consumers.size();
-}
-
-bool Queue::canAutoDelete() const{
- Locker locker(lock);
- return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
-}
diff --git a/cpp/broker/src/QueueRegistry.cpp b/cpp/broker/src/QueueRegistry.cpp
deleted file mode 100644
index f807415314..0000000000
--- a/cpp/broker/src/QueueRegistry.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "QueueRegistry.h"
-#include "MonitorImpl.h"
-#include "SessionHandlerImpl.h"
-#include <sstream>
-#include <assert.h>
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-QueueRegistry::QueueRegistry() : counter(1){}
-
-QueueRegistry::~QueueRegistry(){}
-
-std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
-{
- Locker locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
- QueueMap::iterator i = queues.find(name);
- if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
- queues[name] = queue;
- return std::pair<Queue::shared_ptr, bool>(queue, true);
- } else {
- return std::pair<Queue::shared_ptr, bool>(i->second, false);
- }
-}
-
-void QueueRegistry::destroy(const string& name){
- Locker locker(lock);
- queues.erase(name);
-}
-
-Queue::shared_ptr QueueRegistry::find(const string& name){
- Locker locker(lock);
- QueueMap::iterator i = queues.find(name);
- if (i == queues.end()) {
- return Queue::shared_ptr();
- } else {
- return i->second;
- }
-}
-
-string QueueRegistry::generateName(){
- string name;
- do {
- std::stringstream ss;
- ss << "tmp_" << counter++;
- name = ss.str();
- // Thread safety: Private function, only called with lock held
- // so this is OK.
- } while(queues.find(name) != queues.end());
- return name;
-}
diff --git a/cpp/broker/src/Router.cpp b/cpp/broker/src/Router.cpp
deleted file mode 100644
index c2dd74bf7d..0000000000
--- a/cpp/broker/src/Router.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Router.h"
-
-using namespace qpid::broker;
-
-Router::Router(ExchangeRegistry& _registry) : registry(_registry){}
-
-void Router::operator()(Message::shared_ptr& msg){
- Exchange* exchange = registry.get(msg->getExchange());
- if(exchange){
- exchange->route(msg, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
- }else{
- std::cout << "WARNING: Could not route message, unknown exchange: " << msg->getExchange() << std::endl;
- }
-
-}
diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
deleted file mode 100644
index 39c627afef..0000000000
--- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "SessionHandlerFactoryImpl.h"
-#include "SessionHandlerImpl.h"
-#include "FanOutExchange.h"
-#include "HeadersExchange.h"
-
-using namespace qpid::broker;
-using namespace qpid::io;
-
-namespace
-{
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-}
-
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
- exchanges.declare(new DirectExchange(empty)); // Default exchange.
- exchanges.declare(new DirectExchange(amq_direct));
- exchanges.declare(new TopicExchange(amq_topic));
- exchanges.declare(new FanOutExchange(amq_fanout));
- exchanges.declare(new HeadersExchange(amq_match));
- cleaner.start();
-}
-
-SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
- return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
-}
-
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
- cleaner.stop();
-}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
deleted file mode 100644
index 0d8539332c..0000000000
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "SessionHandlerImpl.h"
-#include "FanOutExchange.h"
-#include "HeadersExchange.h"
-#include "Router.h"
-#include "TopicExchange.h"
-#include "assert.h"
-
-using namespace std::tr1;
-using namespace qpid::broker;
-using namespace qpid::io;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
- QueueRegistry* _queues,
- ExchangeRegistry* _exchanges,
- AutoDelete* _cleaner,
- const u_int32_t _timeout) :
- context(_context),
- client(context),
- queues(_queues),
- exchanges(_exchanges),
- cleaner(_cleaner),
- timeout(_timeout),
- connectionHandler(new ConnectionHandlerImpl(this)),
- channelHandler(new ChannelHandlerImpl(this)),
- basicHandler(new BasicHandlerImpl(this)),
- exchangeHandler(new ExchangeHandlerImpl(this)),
- queueHandler(new QueueHandlerImpl(this)),
- framemax(65536),
- heartbeat(0) {}
-
-SessionHandlerImpl::~SessionHandlerImpl(){
- // TODO aconway 2006-09-07: Should be auto_ptr or plain members.
- delete channelHandler;
- delete connectionHandler;
- delete basicHandler;
- delete exchangeHandler;
- delete queueHandler;
-}
-
-Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
- channel_iterator i = channels.find(channel);
- if(i == channels.end()){
- throw ConnectionException(504, "Unknown channel: " + channel);
- }
- return i->second;
-}
-
-Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
- Queue::shared_ptr queue;
- if (name.empty()) {
- queue = getChannel(channel)->getDefaultQueue();
- if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
- } else {
- queue = queues->find(name);
- if (queue == 0) {
- throw ChannelException( 404, "Queue not found: " + name);
- }
- }
- return queue;
-}
-
-
-Exchange* SessionHandlerImpl::findExchange(const string& name){
- exchanges->getLock()->acquire();
- Exchange* exchange(exchanges->get(name));
- exchanges->getLock()->release();
- return exchange;
-}
-
-void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
- u_int16_t channel = frame->getChannel();
- AMQBody::shared_ptr body = frame->getBody();
- AMQMethodBody::shared_ptr method;
-
- switch(body->type())
- {
- case METHOD_BODY:
- method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
- try{
- method->invoke(*this, channel);
- }catch(ChannelException& e){
- channels[channel]->close();
- channels.erase(channel);
- client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
- }catch(ConnectionException& e){
- client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
- }
- break;
-
- case HEADER_BODY:
- this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
- break;
-
- case CONTENT_BODY:
- this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
- break;
-
- case HEARTBEAT_BODY:
- //channel must be 0
- this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
- break;
- }
-}
-
-void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){
- //send connection start
- FieldTable properties;
- string mechanisms("PLAIN");
- string locales("en_US");
- client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
-}
-
-void SessionHandlerImpl::idleOut(){
-
-}
-
-void SessionHandlerImpl::idleIn(){
-
-}
-
-void SessionHandlerImpl::closed(){
- for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
- Channel* c = i->second;
- channels.erase(i);
- c->close();
- delete c;
- }
- for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
- string name = (*i)->getName();
- queues->destroy(name);
- exclusiveQueues.erase(i);
- }
-}
-
-void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
- getChannel(channel)->handleHeader(body, Router(*exchanges));
-}
-
-void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
- getChannel(channel)->handleContent(body, Router(*exchanges));
-}
-
-void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
- u_int16_t /*channel*/, FieldTable& /*clientProperties*/, string& /*mechanism*/,
- string& /*response*/, string& /*locale*/){
-
- parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, string& /*response*/){}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
- parent->framemax = framemax;
- parent->heartbeat = heartbeat;
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, string& /*virtualHost*/, string& /*capabilities*/, bool /*insist*/){
- string knownhosts;
- parent->client.getConnection().openOk(0, knownhosts);
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::close(
- u_int16_t /*channel*/, u_int16_t /*replyCode*/, string& /*replyText*/,
- u_int16_t /*classId*/, u_int16_t /*methodId*/)
-{
- parent->client.getConnection().closeOk(0);
- parent->context->close();
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
- parent->context->close();
-}
-
-
-
-void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*outOfBand*/){
- parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
- parent->client.getChannel().openOk(channel);
-}
-
-void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
-void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
-
-void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, string& /*replyText*/,
- u_int16_t /*classId*/, u_int16_t /*methodId*/){
- Channel* c = parent->getChannel(channel);
- if(c){
- parent->channels.erase(channel);
- c->close();
- delete c;
- parent->client.getChannel().closeOk(channel);
- }
-}
-
-void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
-
-
-
-void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- FieldTable& /*arguments*/){
-
- if(!passive && (
- type != TopicExchange::typeName &&
- type != DirectExchange::typeName &&
- type != FanOutExchange::typeName &&
- type != HeadersExchange::typeName
- )
- )
- {
- throw ChannelException(540, "Exchange type not implemented: " + type);
- }
-
- parent->exchanges->getLock()->acquire();
- if(!parent->exchanges->get(exchange)){
- if(type == TopicExchange::typeName){
- parent->exchanges->declare(new TopicExchange(exchange));
- }else if(type == DirectExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if(type == FanOutExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if (type == HeadersExchange::typeName) {
- parent->exchanges->declare(new HeadersExchange(exchange));
- }
- }
- parent->exchanges->getLock()->release();
- if(!nowait){
- parent->client.getExchange().declareOk(channel);
- }
-}
-
-void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){
- //TODO: implement unused
- parent->exchanges->getLock()->acquire();
- parent->exchanges->destroy(exchange);
- parent->exchanges->getLock()->release();
- if(!nowait) parent->client.getExchange().deleteOk(channel);
-}
-
-void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, FieldTable& /*arguments*/){
- Queue::shared_ptr queue;
- if (passive && !name.empty()) {
- queue = parent->getQueue(name, channel);
- } else {
- std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
- parent->getChannel(channel)->setDefaultQueue(queue);
- //add default binding:
- parent->exchanges->getDefault()->bind(queue, name, 0);
- if(exclusive){
- parent->exclusiveQueues.push_back(queue);
- } else if(autoDelete){
- parent->cleaner->add(queue);
- }
- }
- }
- if(exclusive && !queue->isExclusiveOwner(parent)){
- throw ChannelException(405, "Cannot grant exclusive access to queue");
- }
- if(!nowait){
- name = queue->getName();
- parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount());
- }
-}
-
-void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, string& queueName,
- string& exchangeName, string& routingKey, bool nowait,
- FieldTable& arguments){
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange* exchange = parent->exchanges->get(exchangeName);
- if(exchange){
- if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
- exchange->bind(queue, routingKey, &arguments);
- if(!nowait) parent->client.getQueue().bindOk(channel);
- }else{
- throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
- }
-}
-
-void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, bool nowait){
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- int count = queue->purge();
- if(!nowait) parent->client.getQueue().purgeOk(channel, count);
-}
-
-void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
- ChannelException error(0, "");
- int count(0);
- Queue::shared_ptr q = parent->getQueue(queue, channel);
- if(ifEmpty && q->getMessageCount() > 0){
- throw ChannelException(406, "Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw ChannelException(406, "Queue in use.");
- }else{
- //remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(parent)){
- queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
- if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
- }
- count = q->getMessageCount();
- parent->queues->destroy(queue);
- }
- if(!nowait) parent->client.getQueue().deleteOk(channel, count);
-}
-
-
-
-
-void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
- //TODO: handle global
- parent->getChannel(channel)->setPrefetchSize(prefetchSize);
- parent->getChannel(channel)->setPrefetchCount(prefetchCount);
- parent->client.getBasic().qosOk(channel);
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/,
- string& queueName, string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait){
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
- Channel* channel = parent->channels[channelId];
- if(!consumerTag.empty() && channel->exists(consumerTag)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
- if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
- parent->getChannel(channel)->cancel(consumerTag);
- if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
- string& exchange, string& routingKey,
- bool mandatory, bool immediate){
-
- Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
- parent->getChannel(channel)->handlePublish(msg);
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){
- Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
- if(!parent->getChannel(channelId)->get(queue, !noAck)){
- string clusterId;//not used, part of an imatix hack
- parent->client.getBasic().getEmpty(channelId, clusterId);
- }
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
- try{
- parent->getChannel(channel)->ack(deliveryTag, multiple);
- }catch(InvalidAckException& e){
- throw ConnectionException(530, "Received ack for unrecognised delivery tag");
- }
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-
-void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
- parent->getChannel(channel)->recover(requeue);
-}
-
diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp
deleted file mode 100644
index 53977747c4..0000000000
--- a/cpp/broker/src/TopicExchange.cpp
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "TopicExchange.h"
-#include "ExchangeBinding.h"
-#include <algorithm>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-
-// TODO aconway 2006-09-20: More efficient matching algorithm.
-// Areas for improvement:
-// - excessive string copying: should be 0 copy, match from original buffer.
-// - match/lookup: use descision tree or other more efficient structure.
-
-Tokens& Tokens::operator=(const std::string& s) {
- clear();
- if (s.empty()) return *this;
- std::string::const_iterator i = s.begin();
- while (true) {
- // Invariant: i is at the beginning of the next untokenized word.
- std::string::const_iterator j = find(i, s.end(), '.');
- push_back(std::string(i, j));
- if (j == s.end()) return *this;
- i = j + 1;
- }
- return *this;
-}
-
-size_t Tokens::Hash::operator()(const Tokens& p) const {
- size_t hash = 0;
- for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) {
- hash += std::tr1::hash<std::string>()(*i);
- }
- return hash;
-}
-
-TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
- Tokens::operator=(tokens);
- normalize();
- return *this;
-}
-
-namespace {
-const std::string hashmark("#");
-const std::string star("*");
-}
-
-void TopicPattern::normalize() {
- std::string word;
- Tokens::iterator i = begin();
- while (i != end()) {
- if (*i == hashmark) {
- ++i;
- while (i != end()) {
- // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
- if (*i == star) { // Move * before #.
- std::swap(*i, *(i-1));
- ++i;
- } else if (*i == hashmark) {
- erase(i); // Remove extra #
- } else {
- break;
- }
- }
- } else {
- i ++;
- }
- }
-}
-
-
-namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
-// Need more efficient Tokens impl that can operate on a string in place.
-//
-bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
-{
- // Invariant: [pattern_begin..p) matches [target_begin..t)
- Tokens::const_iterator p = pattern_begin;
- Tokens::const_iterator t = target_begin;
- while (p != pattern_end && t != target_end)
- {
- if (*p == star || *p == *t) {
- ++p, ++t;
- } else if (*p == hashmark) {
- ++p;
- if (do_match(p, pattern_end, t, target_end)) return true;
- while (t != target_end) {
- ++t;
- if (do_match(p, pattern_end, t, target_end)) return true;
- }
- return false;
- } else {
- return false;
- }
- }
- while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
- return t == target_end && p == pattern_end;
-}
-}
-
-bool TopicPattern::match(const Tokens& target) const
-{
- return do_match(begin(), end(), target.begin(), target.end());
-}
-
-TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-
-void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- lock.acquire();
- TopicPattern routingPattern(routingKey);
- bindings[routingPattern].push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
- lock.release();
-}
-
-void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
- BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Queue::vector& qv(bi->second);
- if (bi == bindings.end()) return;
- Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
- if(q == qv.end()) return;
- qv.erase(q);
- if(qv.empty()) bindings.erase(bi);
- lock.release();
-}
-
-
-void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (i->first.match(routingKey)) {
- Queue::vector& qv(i->second);
- for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- (*j)->deliver(msg);
- }
- }
- }
- lock.release();
-}
-
-TopicExchange::~TopicExchange() {}
-
-const std::string TopicExchange::typeName("topic");
-
-