summaryrefslogtreecommitdiff
path: root/Final/cpp/lib/broker
diff options
context:
space:
mode:
Diffstat (limited to 'Final/cpp/lib/broker')
-rw-r--r--Final/cpp/lib/broker/AccumulatedAck.cpp49
-rw-r--r--Final/cpp/lib/broker/AccumulatedAck.h57
-rw-r--r--Final/cpp/lib/broker/AutoDelete.cpp86
-rw-r--r--Final/cpp/lib/broker/AutoDelete.h55
-rw-r--r--Final/cpp/lib/broker/Binding.h38
-rw-r--r--Final/cpp/lib/broker/Broker.cpp84
-rw-r--r--Final/cpp/lib/broker/Broker.h91
-rw-r--r--Final/cpp/lib/broker/BrokerChannel.cpp272
-rw-r--r--Final/cpp/lib/broker/BrokerChannel.h130
-rw-r--r--Final/cpp/lib/broker/BrokerExchange.h50
-rw-r--r--Final/cpp/lib/broker/BrokerMessage.cpp223
-rw-r--r--Final/cpp/lib/broker/BrokerMessage.h145
-rw-r--r--Final/cpp/lib/broker/BrokerQueue.cpp247
-rw-r--r--Final/cpp/lib/broker/BrokerQueue.h146
-rw-r--r--Final/cpp/lib/broker/ConnectionToken.h38
-rw-r--r--Final/cpp/lib/broker/Consumer.h37
-rw-r--r--Final/cpp/lib/broker/Content.h44
-rw-r--r--Final/cpp/lib/broker/Daemon.cpp182
-rw-r--r--Final/cpp/lib/broker/Daemon.h84
-rw-r--r--Final/cpp/lib/broker/DeletingTxOp.cpp45
-rw-r--r--Final/cpp/lib/broker/DeletingTxOp.h45
-rw-r--r--Final/cpp/lib/broker/Deliverable.h37
-rw-r--r--Final/cpp/lib/broker/DeliverableMessage.cpp33
-rw-r--r--Final/cpp/lib/broker/DeliverableMessage.h41
-rw-r--r--Final/cpp/lib/broker/DeliveryRecord.cpp91
-rw-r--r--Final/cpp/lib/broker/DeliveryRecord.h64
-rw-r--r--Final/cpp/lib/broker/DirectExchange.cpp73
-rw-r--r--Final/cpp/lib/broker/DirectExchange.h57
-rw-r--r--Final/cpp/lib/broker/ExchangeBinding.cpp35
-rw-r--r--Final/cpp/lib/broker/ExchangeBinding.h48
-rw-r--r--Final/cpp/lib/broker/ExchangeRegistry.cpp73
-rw-r--r--Final/cpp/lib/broker/ExchangeRegistry.h46
-rw-r--r--Final/cpp/lib/broker/FanOutExchange.cpp60
-rw-r--r--Final/cpp/lib/broker/FanOutExchange.h60
-rw-r--r--Final/cpp/lib/broker/HeadersExchange.cpp121
-rw-r--r--Final/cpp/lib/broker/HeadersExchange.h65
-rw-r--r--Final/cpp/lib/broker/InMemoryContent.cpp72
-rw-r--r--Final/cpp/lib/broker/InMemoryContent.h47
-rw-r--r--Final/cpp/lib/broker/LazyLoadedContent.cpp63
-rw-r--r--Final/cpp/lib/broker/LazyLoadedContent.h46
-rw-r--r--Final/cpp/lib/broker/Makefile.am107
-rw-r--r--Final/cpp/lib/broker/MessageBuilder.cpp71
-rw-r--r--Final/cpp/lib/broker/MessageBuilder.h58
-rw-r--r--Final/cpp/lib/broker/MessageStore.h140
-rw-r--r--Final/cpp/lib/broker/MessageStoreModule.cpp104
-rw-r--r--Final/cpp/lib/broker/MessageStoreModule.h60
-rw-r--r--Final/cpp/lib/broker/NameGenerator.cpp32
-rw-r--r--Final/cpp/lib/broker/NameGenerator.h39
-rw-r--r--Final/cpp/lib/broker/NullMessageStore.cpp104
-rw-r--r--Final/cpp/lib/broker/NullMessageStore.h59
-rw-r--r--Final/cpp/lib/broker/Prefetch.h42
-rw-r--r--Final/cpp/lib/broker/QueuePolicy.cpp69
-rw-r--r--Final/cpp/lib/broker/QueuePolicy.h54
-rw-r--r--Final/cpp/lib/broker/QueueRegistry.cpp81
-rw-r--r--Final/cpp/lib/broker/QueueRegistry.h96
-rw-r--r--Final/cpp/lib/broker/RecoveryManager.cpp42
-rw-r--r--Final/cpp/lib/broker/RecoveryManager.h45
-rw-r--r--Final/cpp/lib/broker/SessionHandlerFactoryImpl.cpp69
-rw-r--r--Final/cpp/lib/broker/SessionHandlerFactoryImpl.h57
-rw-r--r--Final/cpp/lib/broker/SessionHandlerImpl.cpp468
-rw-r--r--Final/cpp/lib/broker/SessionHandlerImpl.h270
-rw-r--r--Final/cpp/lib/broker/TopicExchange.cpp156
-rw-r--r--Final/cpp/lib/broker/TopicExchange.h100
-rw-r--r--Final/cpp/lib/broker/TransactionalStore.h47
-rw-r--r--Final/cpp/lib/broker/TxAck.cpp54
-rw-r--r--Final/cpp/lib/broker/TxAck.h58
-rw-r--r--Final/cpp/lib/broker/TxBuffer.cpp55
-rw-r--r--Final/cpp/lib/broker/TxBuffer.h107
-rw-r--r--Final/cpp/lib/broker/TxOp.h39
-rw-r--r--Final/cpp/lib/broker/TxPublish.cpp60
-rw-r--r--Final/cpp/lib/broker/TxPublish.h80
71 files changed, 6103 insertions, 0 deletions
diff --git a/Final/cpp/lib/broker/AccumulatedAck.cpp b/Final/cpp/lib/broker/AccumulatedAck.cpp
new file mode 100644
index 0000000000..a9826ba5ea
--- /dev/null
+++ b/Final/cpp/lib/broker/AccumulatedAck.cpp
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AccumulatedAck.h>
+
+using std::less_equal;
+using std::bind2nd;
+using namespace qpid::broker;
+
+void AccumulatedAck::update(u_int64_t tag, bool multiple){
+ if(multiple){
+ if(tag > range) range = tag;
+ //else don't care, it is already counted
+ }else if(tag > range){
+ individual.push_back(tag);
+ }
+}
+
+void AccumulatedAck::consolidate(){
+ individual.sort();
+ //remove any individual tags that are covered by range
+ individual.remove_if(bind2nd(less_equal<u_int64_t>(), range));
+}
+
+void AccumulatedAck::clear(){
+ range = 0;
+ individual.clear();
+}
+
+bool AccumulatedAck::covers(u_int64_t tag) const{
+ return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end();
+}
diff --git a/Final/cpp/lib/broker/AccumulatedAck.h b/Final/cpp/lib/broker/AccumulatedAck.h
new file mode 100644
index 0000000000..c472f7f3ea
--- /dev/null
+++ b/Final/cpp/lib/broker/AccumulatedAck.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _AccumulatedAck_
+#define _AccumulatedAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Keeps an accumulated record of acked messages (by delivery
+ * tag).
+ */
+ class AccumulatedAck {
+ public:
+ /**
+ * If not zero, then everything up to this value has been
+ * acked.
+ */
+ u_int64_t range;
+ /**
+ * List of individually acked messages that are not
+ * included in the range marked by 'range'.
+ */
+ std::list<u_int64_t> individual;
+
+ AccumulatedAck(u_int64_t r) : range(r) {}
+ void update(u_int64_t tag, bool multiple);
+ void consolidate();
+ void clear();
+ bool covers(u_int64_t tag) const;
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/AutoDelete.cpp b/Final/cpp/lib/broker/AutoDelete.cpp
new file mode 100644
index 0000000000..ae48d10505
--- /dev/null
+++ b/Final/cpp/lib/broker/AutoDelete.cpp
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AutoDelete.h>
+#include <sys/Time.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period)
+ : registry(_registry), period(_period), stopped(true) { }
+
+void AutoDelete::add(Queue::shared_ptr const queue){
+ Mutex::ScopedLock l(lock);
+ queues.push(queue);
+}
+
+Queue::shared_ptr const AutoDelete::pop(){
+ Queue::shared_ptr next;
+ Mutex::ScopedLock l(lock);
+ if(!queues.empty()){
+ next = queues.front();
+ queues.pop();
+ }
+ return next;
+}
+
+void AutoDelete::process(){
+ Queue::shared_ptr seen;
+ for(Queue::shared_ptr q = pop(); q; q = pop()){
+ if(seen == q){
+ add(q);
+ break;
+ }else if(q->canAutoDelete()){
+ std::string name(q->getName());
+ registry->destroy(name);
+ std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
+ }else{
+ add(q);
+ if(!seen) seen = q;
+ }
+ }
+}
+
+void AutoDelete::run(){
+ Monitor::ScopedLock l(monitor);
+ while(!stopped){
+ process();
+ monitor.wait(period*TIME_MSEC);
+ }
+}
+
+void AutoDelete::start(){
+ Monitor::ScopedLock l(monitor);
+ if(stopped){
+ stopped = false;
+ runner = Thread(this);
+ }
+}
+
+void AutoDelete::stop(){
+ {
+ Monitor::ScopedLock l(monitor);
+ if(stopped) return;
+ stopped = true;
+ }
+ monitor.notify();
+ runner.join();
+}
diff --git a/Final/cpp/lib/broker/AutoDelete.h b/Final/cpp/lib/broker/AutoDelete.h
new file mode 100644
index 0000000000..19a5938df1
--- /dev/null
+++ b/Final/cpp/lib/broker/AutoDelete.h
@@ -0,0 +1,55 @@
+#ifndef _AutoDelete_
+#define _AutoDelete_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <queue>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+#include <QueueRegistry.h>
+#include <sys/Thread.h>
+
+namespace qpid {
+ namespace broker{
+ class AutoDelete : private qpid::sys::Runnable {
+ qpid::sys::Mutex lock;
+ qpid::sys::Monitor monitor;
+ std::queue<Queue::shared_ptr> queues;
+ QueueRegistry* const registry;
+ u_int32_t period;
+ volatile bool stopped;
+ qpid::sys::Thread runner;
+
+ Queue::shared_ptr const pop();
+ void process();
+ virtual void run();
+
+ public:
+ AutoDelete(QueueRegistry* const registry, u_int32_t period);
+ void add(Queue::shared_ptr const);
+ void start();
+ void stop();
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/Binding.h b/Final/cpp/lib/broker/Binding.h
new file mode 100644
index 0000000000..16ca223208
--- /dev/null
+++ b/Final/cpp/lib/broker/Binding.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Binding_
+#define _Binding_
+
+#include <FieldTable.h>
+
+namespace qpid {
+ namespace broker {
+ class Binding{
+ public:
+ virtual void cancel() = 0;
+ virtual ~Binding(){}
+ };
+ }
+}
+
+
+#endif
+
diff --git a/Final/cpp/lib/broker/Broker.cpp b/Final/cpp/lib/broker/Broker.cpp
new file mode 100644
index 0000000000..806127bf43
--- /dev/null
+++ b/Final/cpp/lib/broker/Broker.cpp
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <memory>
+#include <Broker.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+Broker::Options::Options() :
+ workerThreads(5),
+ maxConnections(500),
+ connectionBacklog(10),
+ store(),
+ stagingThreshold(5000000)
+{}
+
+void Broker::Options::addTo(po::options_description& desc)
+{
+ using namespace po;
+ CommonOptions::addTo(desc);
+ desc.add_options()
+ ("worker-threads", optValue(workerThreads, "N"),
+ "Broker thread pool size")
+ ("max-connections", optValue(maxConnections, "N"),
+ "Maximum allowed connections")
+ ("connection-backlog", optValue(connectionBacklog, "N"),
+ "Connection backlog limit for server socket.")
+ ("staging-threshold", optValue(stagingThreshold, "N"),
+ "Messages over N bytes are staged to disk.")
+ ("store", optValue(store,"LIBNAME"),
+ "Name of message store shared library.");
+}
+
+
+Broker::Broker(const Options& config) :
+ acceptor(Acceptor::create(config.port,
+ config.connectionBacklog,
+ config.workerThreads,
+ config.trace)),
+ factory(config.store)
+{ }
+
+
+Broker::shared_ptr Broker::create(int16_t port)
+{
+ Options config;
+ config.port=port;
+ return create(config);
+}
+
+Broker::shared_ptr Broker::create(const Options& config) {
+ return Broker::shared_ptr(new Broker(config));
+}
+
+void Broker::run() {
+ acceptor->run(&factory);
+}
+
+void Broker::shutdown() {
+ if (acceptor)
+ acceptor->shutdown();
+}
+
+Broker::~Broker() { }
+
diff --git a/Final/cpp/lib/broker/Broker.h b/Final/cpp/lib/broker/Broker.h
new file mode 100644
index 0000000000..8b54bd592b
--- /dev/null
+++ b/Final/cpp/lib/broker/Broker.h
@@ -0,0 +1,91 @@
+#ifndef _Broker_
+#define _Broker_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <CommonOptions.h>
+#include <SessionHandlerFactoryImpl.h>
+#include <sys/Runnable.h>
+#include <sys/Acceptor.h>
+#include <SharedObject.h>
+
+namespace qpid {
+namespace broker {
+/**
+ * A broker instance.
+ */
+class Broker : public qpid::sys::Runnable,
+ public qpid::SharedObject<Broker>
+{
+ public:
+ struct Options : public CommonOptions {
+ Options();
+ void addTo(po::options_description&);
+ int workerThreads;
+ int maxConnections;
+ int connectionBacklog;
+ std::string store;
+ long stagingThreshold;
+ };
+
+ virtual ~Broker();
+
+ /**
+ * Create a broker.
+ * @param port Port to listen on or 0 to pick a port dynamically.
+ */
+ static shared_ptr create(int16_t port = Options::DEFAULT_PORT);
+
+ /**
+ * Create a broker using a Configuration.
+ */
+ static shared_ptr create(const Options& config);
+
+ /**
+ * Return listening port. If called before bind this is
+ * the configured port. If called after it is the actual
+ * port, which will be different if the configured port is
+ * 0.
+ */
+ virtual int16_t getPort() const { return acceptor->getPort(); }
+
+ /**
+ * Run the broker. Implements Runnable::run() so the broker
+ * can be run in a separate thread.
+ */
+ virtual void run();
+
+ /** Shut down the broker */
+ virtual void shutdown();
+
+ private:
+ Broker(const Options& config);
+ Options config;
+ qpid::sys::Acceptor::shared_ptr acceptor;
+ SessionHandlerFactoryImpl factory;
+};
+}
+}
+
+
+
+#endif /*!_Broker_*/
diff --git a/Final/cpp/lib/broker/BrokerChannel.cpp b/Final/cpp/lib/broker/BrokerChannel.cpp
new file mode 100644
index 0000000000..d8fbdc467c
--- /dev/null
+++ b/Final/cpp/lib/broker/BrokerChannel.cpp
@@ -0,0 +1,272 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerChannel.h>
+#include <QpidError.h>
+#include <iostream>
+#include <sstream>
+#include <assert.h>
+
+using std::mem_fun_ref;
+using std::bind2nd;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
+ id(_id),
+ out(_out),
+ currentDeliveryTag(1),
+ transactional(false),
+ prefetchSize(0),
+ prefetchCount(0),
+ framesize(_framesize),
+ tagGenerator("sgen"),
+ accumulatedAck(0),
+ store(_store),
+ messageBuilder(this, _store, _stagingThreshold),
+ version(_version),
+ flowActive(true){
+
+ outstanding.reset();
+}
+
+Channel::~Channel(){
+}
+
+bool Channel::exists(const string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){
+ if(tag.empty()) tag = tagGenerator.generate();
+ ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
+ try{
+ queue->consume(c, exclusive);//may throw exception
+ consumers[tag] = c;
+ }catch(ExclusiveAccessException& e){
+ delete c;
+ throw e;
+ }
+}
+
+void Channel::cancel(consumer_iterator i){
+ ConsumerImpl* c = i->second;
+ consumers.erase(i);
+ if(c){
+ c->cancel();
+ delete c;
+ }
+}
+
+void Channel::cancel(const string& tag){
+ consumer_iterator i = consumers.find(tag);
+ if(i != consumers.end()){
+ cancel(i);
+ }
+}
+
+void Channel::close(){
+ //cancel all consumers
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+ cancel(i);
+ }
+ //requeue:
+ recover(true);
+}
+
+void Channel::begin(){
+ transactional = true;
+}
+
+void Channel::commit(){
+ TxAck txAck(accumulatedAck, unacked);
+ txBuffer.enlist(&txAck);
+ if(txBuffer.prepare(store)){
+ txBuffer.commit();
+ }
+ accumulatedAck.clear();
+}
+
+void Channel::rollback(){
+ txBuffer.rollback();
+ accumulatedAck.clear();
+}
+
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+ Mutex::ScopedLock locker(deliveryLock);
+
+ u_int64_t deliveryTag = currentDeliveryTag++;
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+ outstanding.size += msg->contentSize();
+ outstanding.count++;
+ }
+ //send deliver method, header and content(s)
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
+}
+
+bool Channel::checkPrefetch(Message::shared_ptr& msg){
+ Mutex::ScopedLock locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
+ return countOk && sizeOk;
+}
+
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack) : parent(_parent),
+ tag(_tag),
+ queue(_queue),
+ connection(_connection),
+ ackExpected(ack),
+ blocked(false){
+}
+
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+ if(!connection || connection != msg->getPublisher()){//check for no_local
+ if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
+ blocked = true;
+ }else{
+ blocked = false;
+ parent->deliver(msg, tag, queue, ackExpected);
+ return true;
+ }
+ }
+ return false;
+}
+
+void Channel::ConsumerImpl::cancel(){
+ if(queue) queue->cancel(this);
+}
+
+void Channel::ConsumerImpl::requestDispatch(){
+ if(blocked) queue->dispatch();
+}
+
+void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
+ Message::shared_ptr message(_message);
+ exchange = _exchange;
+ messageBuilder.initialise(message);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+ messageBuilder.setHeader(header);
+ //at this point, decide based on the size of the message whether we want
+ //to stage it by saving content directly to disk as it arrives
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content){
+ messageBuilder.addContent(content);
+}
+
+void Channel::complete(Message::shared_ptr& msg){
+ if(exchange){
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ }
+ exchange.reset();
+ }else{
+ std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
+ }
+}
+
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if(transactional){
+ accumulatedAck.update(deliveryTag, multiple);
+ //TODO: I think the outstanding prefetch size & count should be updated at this point...
+ //TODO: ...this may then necessitate dispatching to consumers
+ }else{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ if(i == unacked.end()){
+ throw InvalidAckException();
+ }else if(multiple){
+ ack_iterator end = ++i;
+ for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+ unacked.erase(unacked.begin(), end);
+
+ //recalculate the prefetch:
+ outstanding.reset();
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
+ }else{
+ i->discard();
+ i->subtractFrom(&outstanding);
+ unacked.erase(i);
+ }
+
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
+ }
+}
+
+void Channel::recover(bool requeue){
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ if(requeue){
+ outstanding.reset();
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
+ }else{
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+ }
+}
+
+bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Mutex::ScopedLock locker(deliveryLock);
+ u_int64_t myDeliveryTag = currentDeliveryTag++;
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
+}
+
+void Channel::flow(bool active){
+ Mutex::ScopedLock locker(deliveryLock);
+ bool requestDelivery(!flowActive && active);
+ flowActive = active;
+ if (requestDelivery) {
+ //there may be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
+ }
+}
diff --git a/Final/cpp/lib/broker/BrokerChannel.h b/Final/cpp/lib/broker/BrokerChannel.h
new file mode 100644
index 0000000000..be40a25013
--- /dev/null
+++ b/Final/cpp/lib/broker/BrokerChannel.h
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Channel_
+#define _Channel_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <map>
+#include <AccumulatedAck.h>
+#include <Binding.h>
+#include <Consumer.h>
+#include <DeletingTxOp.h>
+#include <DeliverableMessage.h>
+#include <DeliveryRecord.h>
+#include <BrokerMessage.h>
+#include <MessageBuilder.h>
+#include <NameGenerator.h>
+#include <Prefetch.h>
+#include <BrokerQueue.h>
+#include <MessageStore.h>
+#include <TxAck.h>
+#include <TxBuffer.h>
+#include <TxPublish.h>
+#include <sys/Monitor.h>
+#include <OutputHandler.h>
+#include <AMQContentBody.h>
+#include <AMQHeaderBody.h>
+#include <BasicPublishBody.h>
+
+namespace qpid {
+ namespace broker {
+ using qpid::framing::string;
+
+ /**
+ * Maintains state for an AMQP channel. Handles incoming and
+ * outgoing messages for that channel.
+ */
+ class Channel : private MessageBuilder::CompletionHandler{
+ class ConsumerImpl : public virtual Consumer{
+ Channel* parent;
+ const string tag;
+ Queue::shared_ptr queue;
+ ConnectionToken* const connection;
+ const bool ackExpected;
+ bool blocked;
+ public:
+ ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
+ virtual bool deliver(Message::shared_ptr& msg);
+ void cancel();
+ void requestDispatch();
+ };
+
+ typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+ const int id;
+ qpid::framing::OutputHandler* out;
+ u_int64_t currentDeliveryTag;
+ Queue::shared_ptr defaultQueue;
+ bool transactional;
+ std::map<string, ConsumerImpl*> consumers;
+ u_int32_t prefetchSize;
+ u_int16_t prefetchCount;
+ Prefetch outstanding;
+ u_int32_t framesize;
+ NameGenerator tagGenerator;
+ std::list<DeliveryRecord> unacked;
+ qpid::sys::Mutex deliveryLock;
+ TxBuffer txBuffer;
+ AccumulatedAck accumulatedAck;
+ MessageStore* const store;
+ MessageBuilder messageBuilder;//builder for in-progress message
+ Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
+ qpid::framing::ProtocolVersion version; // version used for this channel
+ bool flowActive;
+
+ virtual void complete(Message::shared_ptr& msg);
+ void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
+ void cancel(consumer_iterator consumer);
+ bool checkPrefetch(Message::shared_ptr& msg);
+
+ public:
+ Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
+ MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
+ ~Channel();
+ inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+ inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
+ inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
+ inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; }
+ bool exists(const string& consumerTag);
+ void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive,
+ ConnectionToken* const connection = 0, const qpid::framing::FieldTable* = 0);
+ void cancel(const string& tag);
+ bool get(Queue::shared_ptr queue, bool ackExpected);
+ void begin();
+ void close();
+ void commit();
+ void rollback();
+ void ack(u_int64_t deliveryTag, bool multiple);
+ void recover(bool requeue);
+ void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
+ void handlePublish(Message* msg, Exchange::shared_ptr exchange);
+ void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
+ void flow(bool active);
+ };
+
+ struct InvalidAckException{};
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/BrokerExchange.h b/Final/cpp/lib/broker/BrokerExchange.h
new file mode 100644
index 0000000000..f5e4d9cb28
--- /dev/null
+++ b/Final/cpp/lib/broker/BrokerExchange.h
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Exchange_
+#define _Exchange_
+
+#include <boost/shared_ptr.hpp>
+#include <Deliverable.h>
+#include <BrokerQueue.h>
+#include <FieldTable.h>
+
+namespace qpid {
+ namespace broker {
+ using std::string;
+
+ class Exchange{
+ const string name;
+ public:
+ typedef boost::shared_ptr<Exchange> shared_ptr;
+
+ explicit Exchange(const string& _name) : name(_name){}
+ virtual ~Exchange(){}
+ string getName() { return name; }
+ virtual string getType() = 0;
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/BrokerMessage.cpp b/Final/cpp/lib/broker/BrokerMessage.cpp
new file mode 100644
index 0000000000..6ba2131a74
--- /dev/null
+++ b/Final/cpp/lib/broker/BrokerMessage.cpp
@@ -0,0 +1,223 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerMessage.h>
+#include <iostream>
+
+#include <InMemoryContent.h>
+#include <LazyLoadedContent.h>
+#include <MessageStore.h>
+#include <BasicDeliverBody.h>
+#include <BasicGetOkBody.h>
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+Message::Message(const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate) : publisher(_publisher),
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate),
+ redelivered(false),
+ size(0),
+ persistenceId(0) {}
+
+Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+ publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
+
+ decode(buffer, headersOnly, contentChunkSize);
+}
+
+Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
+
+Message::~Message(){
+ if (content.get()) content->destroy();
+}
+
+void Message::setHeader(AMQHeaderBody::shared_ptr _header){
+ this->header = _header;
+}
+
+void Message::addContent(AMQContentBody::shared_ptr data){
+ if (!content.get()) {
+ content = std::auto_ptr<Content>(new InMemoryContent());
+ }
+ content->add(data);
+ size += data->size();
+}
+
+bool Message::isComplete(){
+ return header.get() && (header->getContentSize() == contentSize());
+}
+
+void Message::redeliver(){
+ redelivered = true;
+}
+
+void Message::deliver(OutputHandler* out, int channel,
+ const string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize,
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
+ out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ sendContent(out, channel, framesize, version);
+}
+
+void Message::sendGetOk(OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
+ out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ sendContent(out, channel, framesize, version);
+}
+
+void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
+ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+ out->send(new AMQFrame(*version, channel, headerBody));
+
+ Mutex::ScopedLock locker(contentLock);
+ if (content.get()) content->send(*version, out, channel, framesize);
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+ return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+const ConnectionToken* const Message::getPublisher(){
+ return publisher;
+}
+
+bool Message::isPersistent()
+{
+ if(!header) return false;
+ BasicHeaderProperties* props = getHeaderProperties();
+ return props && props->getDeliveryMode() == PERSISTENT;
+}
+
+void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
+{
+ decodeHeader(buffer);
+ if (!headersOnly) decodeContent(buffer, contentChunkSize);
+}
+
+void Message::decodeHeader(Buffer& buffer)
+{
+ buffer.getShortString(exchange);
+ buffer.getShortString(routingKey);
+
+ u_int32_t headerSize = buffer.getLong();
+ AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
+ headerBody->decode(buffer, headerSize);
+ setHeader(headerBody);
+}
+
+void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
+{
+ u_int64_t expected = expectedContentSize();
+ if (expected != buffer.available()) {
+ std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
+ throw Exception("Cannot decode content, buffer not large enough.");
+ }
+
+ if (!chunkSize || chunkSize > expected) {
+ chunkSize = expected;
+ }
+
+ u_int64_t total = 0;
+ while (total < expectedContentSize()) {
+ u_int64_t remaining = expected - total;
+ AMQContentBody::shared_ptr contentBody(new AMQContentBody());
+ contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
+ addContent(contentBody);
+ total += chunkSize;
+ }
+}
+
+void Message::encode(Buffer& buffer)
+{
+ encodeHeader(buffer);
+ encodeContent(buffer);
+}
+
+void Message::encodeHeader(Buffer& buffer)
+{
+ buffer.putShortString(exchange);
+ buffer.putShortString(routingKey);
+ buffer.putLong(header->size());
+ header->encode(buffer);
+}
+
+void Message::encodeContent(Buffer& buffer)
+{
+ Mutex::ScopedLock locker(contentLock);
+ if (content.get()) content->encode(buffer);
+}
+
+u_int32_t Message::encodedSize()
+{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+u_int32_t Message::encodedContentSize()
+{
+ Mutex::ScopedLock locker(contentLock);
+ return content.get() ? content->size() : 0;
+}
+
+u_int32_t Message::encodedHeaderSize()
+{
+ return exchange.size() + 1
+ + routingKey.size() + 1
+ + header->size() + 4;//4 extra bytes for size
+}
+
+u_int64_t Message::expectedContentSize()
+{
+ return header.get() ? header->getContentSize() : 0;
+}
+
+void Message::releaseContent(MessageStore* store)
+{
+ Mutex::ScopedLock locker(contentLock);
+ if (!isPersistent() && persistenceId == 0) {
+ store->stage(this);
+ }
+ if (!content.get() || content->size() > 0) {
+ //set content to lazy loading mode (but only if there is stored content):
+
+ //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is
+ // then set as a member of that message so its lifetime is guaranteed to be no longer than
+ // that of the message itself
+ content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize()));
+ }
+}
+
+void Message::setContent(std::auto_ptr<Content>& _content)
+{
+ Mutex::ScopedLock locker(contentLock);
+ content = _content;
+}
diff --git a/Final/cpp/lib/broker/BrokerMessage.h b/Final/cpp/lib/broker/BrokerMessage.h
new file mode 100644
index 0000000000..1f68e1004a
--- /dev/null
+++ b/Final/cpp/lib/broker/BrokerMessage.h
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Message_
+#define _Message_
+
+#include <memory>
+#include <boost/shared_ptr.hpp>
+#include <AMQContentBody.h>
+#include <AMQHeaderBody.h>
+#include <ProtocolVersion.h>
+#include <BasicHeaderProperties.h>
+#include <ConnectionToken.h>
+#include <Content.h>
+#include <OutputHandler.h>
+#include <Mutex.h>
+#include <TxBuffer.h>
+
+namespace qpid {
+ namespace broker {
+
+ class MessageStore;
+ using qpid::framing::string;
+
+ /**
+ * Represents an AMQP message, i.e. a header body, a list of
+ * content bodies and some details about the publication
+ * request.
+ */
+ class Message{
+ const ConnectionToken* const publisher;
+ string exchange;
+ string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ bool redelivered;
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ std::auto_ptr<Content> content;
+ u_int64_t size;
+ u_int64_t persistenceId;
+ qpid::sys::Mutex contentLock;
+
+ void sendContent(qpid::framing::OutputHandler* out,
+ int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version);
+
+ public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ Message(const ConnectionToken* const publisher,
+ const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate);
+ Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
+ Message();
+ ~Message();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ const ConnectionToken* const getPublisher();
+
+ void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void redeliver();
+
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ const string& getRoutingKey() const { return routingKey; }
+ const string& getExchange() const { return exchange; }
+ u_int64_t contentSize() const { return size; }
+ u_int64_t getPersistenceId() const { return persistenceId; }
+ void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
+
+ void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
+ void decodeHeader(qpid::framing::Buffer& buffer);
+ void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
+
+ void encode(qpid::framing::Buffer& buffer);
+ void encodeHeader(qpid::framing::Buffer& buffer);
+ void encodeContent(qpid::framing::Buffer& buffer);
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ */
+ u_int32_t encodedSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ */
+ u_int32_t encodedHeaderSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ u_int32_t encodedContentSize();
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ void releaseContent(MessageStore* store);
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ u_int64_t expectedContentSize();
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ void setContent(std::auto_ptr<Content>& content);
+ };
+
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/BrokerQueue.cpp b/Final/cpp/lib/broker/BrokerQueue.cpp
new file mode 100644
index 0000000000..0e48d3b13d
--- /dev/null
+++ b/Final/cpp/lib/broker/BrokerQueue.cpp
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerQueue.h>
+#include <MessageStore.h>
+#include <sys/Monitor.h>
+#include <sys/Time.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+Queue::Queue(const string& _name, u_int32_t _autodelete,
+ MessageStore* const _store,
+ const ConnectionToken* const _owner) :
+
+ name(_name),
+ autodelete(_autodelete),
+ store(_store),
+ owner(_owner),
+ queueing(false),
+ dispatching(false),
+ next(0),
+ lastUsed(0),
+ exclusive(0),
+ persistenceId(0)
+{
+ if(autodelete) lastUsed = now()/TIME_MSEC;
+}
+
+Queue::~Queue(){
+ for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
+ b->cancel();
+ bindings.pop();
+ }
+}
+
+void Queue::bound(Binding* b){
+ bindings.push(b);
+}
+
+void Queue::deliver(Message::shared_ptr& msg){
+ enqueue(0, msg, 0);
+ process(msg);
+}
+
+void Queue::recover(Message::shared_ptr& msg){
+ push(msg);
+ if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
+ //content has not been loaded, need to ensure that lazy loading mode is set:
+ //TODO: find a nicer way to do this
+ msg->releaseContent(store);
+ }
+}
+
+void Queue::process(Message::shared_ptr& msg){
+ Mutex::ScopedLock locker(lock);
+ if(queueing || !dispatch(msg)){
+ push(msg);
+ }
+}
+
+bool Queue::dispatch(Message::shared_ptr& msg){
+ if(consumers.empty()){
+ return false;
+ }else if(exclusive){
+ if(!exclusive->deliver(msg)){
+ std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
+ }
+ return true;
+ }else{
+ //deliver to next consumer
+ next = next % consumers.size();
+ Consumer* c = consumers[next];
+ int start = next;
+ while(c){
+ next++;
+ if(c->deliver(msg)) return true;
+
+ next = next % consumers.size();
+ c = next == start ? 0 : consumers[next];
+ }
+ return false;
+ }
+}
+
+bool Queue::startDispatching(){
+ Mutex::ScopedLock locker(lock);
+ if(queueing && !dispatching){
+ dispatching = true;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Queue::dispatch(){
+ bool proceed = startDispatching();
+ while(proceed){
+ Mutex::ScopedLock locker(lock);
+ if(!messages.empty() && dispatch(messages.front())){
+ pop();
+ }else{
+ dispatching = false;
+ proceed = false;
+ queueing = !messages.empty();
+ }
+ }
+}
+
+void Queue::consume(Consumer* c, bool requestExclusive){
+ Mutex::ScopedLock locker(lock);
+ if(exclusive) throw ExclusiveAccessException();
+ if(requestExclusive){
+ if(!consumers.empty()) throw ExclusiveAccessException();
+ exclusive = c;
+ }
+
+ if(autodelete && consumers.empty()) lastUsed = 0;
+ consumers.push_back(c);
+}
+
+void Queue::cancel(Consumer* c){
+ Mutex::ScopedLock locker(lock);
+ consumers.erase(find(consumers.begin(), consumers.end(), c));
+ if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC;
+ if(exclusive == c) exclusive = 0;
+}
+
+Message::shared_ptr Queue::dequeue(){
+ Mutex::ScopedLock locker(lock);
+ Message::shared_ptr msg;
+ if(!messages.empty()){
+ msg = messages.front();
+ pop();
+ }
+ return msg;
+}
+
+u_int32_t Queue::purge(){
+ Mutex::ScopedLock locker(lock);
+ int count = messages.size();
+ while(!messages.empty()) pop();
+ return count;
+}
+
+void Queue::pop(){
+ if (policy.get()) policy->dequeued(messages.front()->contentSize());
+ messages.pop();
+}
+
+void Queue::push(Message::shared_ptr& msg){
+ queueing = true;
+ messages.push(msg);
+ if (policy.get()) {
+ policy->enqueued(msg->contentSize());
+ if (policy->limitExceeded()) {
+ msg->releaseContent(store);
+ }
+ }
+}
+
+u_int32_t Queue::getMessageCount() const{
+ Mutex::ScopedLock locker(lock);
+ return messages.size();
+}
+
+u_int32_t Queue::getConsumerCount() const{
+ Mutex::ScopedLock locker(lock);
+ return consumers.size();
+}
+
+bool Queue::canAutoDelete() const{
+ Mutex::ScopedLock locker(lock);
+ return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete);
+}
+
+void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+{
+ if (msg->isPersistent() && store) {
+ store->enqueue(ctxt, msg.get(), *this, xid);
+ }
+}
+
+void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+{
+ if (msg->isPersistent() && store) {
+ store->dequeue(ctxt, msg.get(), *this, xid);
+ }
+}
+
+namespace
+{
+ const std::string qpidMaxSize("qpid.max_size");
+ const std::string qpidMaxCount("qpid.max_count");
+}
+
+void Queue::create(const FieldTable& settings)
+{
+ if (store) {
+ store->create(*this, settings);
+ }
+ configure(settings);
+}
+
+void Queue::configure(const FieldTable& settings)
+{
+ QueuePolicy* _policy = new QueuePolicy(settings);
+ if (_policy->getMaxCount() || _policy->getMaxSize()) {
+ setPolicy(std::auto_ptr<QueuePolicy>(_policy));
+ }
+}
+
+void Queue::destroy()
+{
+ if (store) {
+ store->destroy(*this);
+ }
+}
+
+void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
+{
+ policy = _policy;
+}
+
+const QueuePolicy* const Queue::getPolicy()
+{
+ return policy.get();
+}
diff --git a/Final/cpp/lib/broker/BrokerQueue.h b/Final/cpp/lib/broker/BrokerQueue.h
new file mode 100644
index 0000000000..41611bebe9
--- /dev/null
+++ b/Final/cpp/lib/broker/BrokerQueue.h
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Queue_
+#define _Queue_
+
+#include <vector>
+#include <memory>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include <amqp_types.h>
+#include <Binding.h>
+#include <ConnectionToken.h>
+#include <Consumer.h>
+#include <BrokerMessage.h>
+#include <FieldTable.h>
+#include <sys/Monitor.h>
+#include <QueuePolicy.h>
+
+namespace qpid {
+ namespace broker {
+ class MessageStore;
+
+ /**
+ * Thrown when exclusive access would be violated.
+ */
+ struct ExclusiveAccessException{};
+
+ using std::string;
+
+ /**
+ * The brokers representation of an amqp queue. Messages are
+ * delivered to a queue from where they can be dispatched to
+ * registered consumers or be stored until dequeued or until one
+ * or more consumers registers.
+ */
+ class Queue{
+ const string name;
+ const u_int32_t autodelete;
+ MessageStore* const store;
+ const ConnectionToken* const owner;
+ std::vector<Consumer*> consumers;
+ std::queue<Binding*> bindings;
+ std::queue<Message::shared_ptr> messages;
+ bool queueing;
+ bool dispatching;
+ int next;
+ mutable qpid::sys::Mutex lock;
+ int64_t lastUsed;
+ Consumer* exclusive;
+ mutable u_int64_t persistenceId;
+ std::auto_ptr<QueuePolicy> policy;
+
+ void pop();
+ void push(Message::shared_ptr& msg);
+ bool startDispatching();
+ bool dispatch(Message::shared_ptr& msg);
+ void setPolicy(std::auto_ptr<QueuePolicy> policy);
+
+ public:
+
+ typedef boost::shared_ptr<Queue> shared_ptr;
+
+ typedef std::vector<shared_ptr> vector;
+
+ Queue(const string& name, u_int32_t autodelete = 0,
+ MessageStore* const store = 0,
+ const ConnectionToken* const owner = 0);
+ ~Queue();
+
+ void create(const qpid::framing::FieldTable& settings);
+ void configure(const qpid::framing::FieldTable& settings);
+ void destroy();
+ /**
+ * Informs the queue of a binding that should be cancelled on
+ * destruction of the queue.
+ */
+ void bound(Binding* b);
+ /**
+ * Delivers a message to the queue. Will record it as
+ * enqueued if persistent then process it.
+ */
+ void deliver(Message::shared_ptr& msg);
+ /**
+ * Dispatches the messages immediately to a consumer if
+ * one is available or stores it for later if not.
+ */
+ void process(Message::shared_ptr& msg);
+ /**
+ * Used during recovery to add stored messages back to the queue
+ */
+ void recover(Message::shared_ptr& msg);
+ /**
+ * Dispatch any queued messages providing there are
+ * consumers for them. Only one thread can be dispatching
+ * at any time, but this method (rather than the caller)
+ * is responsible for ensuring that.
+ */
+ void dispatch();
+ void consume(Consumer* c, bool exclusive = false);
+ void cancel(Consumer* c);
+ u_int32_t purge();
+ u_int32_t getMessageCount() const;
+ u_int32_t getConsumerCount() const;
+ inline const string& getName() const { return name; }
+ inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
+ inline bool hasExclusiveConsumer() const { return exclusive; }
+ inline u_int64_t getPersistenceId() const { return persistenceId; }
+ inline void setPersistenceId(u_int64_t _persistenceId) const { persistenceId = _persistenceId; }
+
+ bool canAutoDelete() const;
+
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+ /**
+ * dequeue from store (only done once messages is acknowledged)
+ */
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+ /**
+ * dequeues from memory only
+ */
+ Message::shared_ptr dequeue();
+
+ const QueuePolicy* const getPolicy();
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/ConnectionToken.h b/Final/cpp/lib/broker/ConnectionToken.h
new file mode 100644
index 0000000000..7e7f813d0e
--- /dev/null
+++ b/Final/cpp/lib/broker/ConnectionToken.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionToken_
+#define _ConnectionToken_
+
+namespace qpid {
+ namespace broker {
+ /**
+ * An empty interface allowing opaque implementations of some
+ * form of token to identify a connection.
+ */
+ class ConnectionToken{
+ public:
+ virtual ~ConnectionToken(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/Consumer.h b/Final/cpp/lib/broker/Consumer.h
new file mode 100644
index 0000000000..26deef4a26
--- /dev/null
+++ b/Final/cpp/lib/broker/Consumer.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Consumer_
+#define _Consumer_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+ namespace broker {
+ class Consumer{
+ public:
+ virtual bool deliver(Message::shared_ptr& msg) = 0;
+ virtual ~Consumer(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/Content.h b/Final/cpp/lib/broker/Content.h
new file mode 100644
index 0000000000..8aacf02959
--- /dev/null
+++ b/Final/cpp/lib/broker/Content.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Content_
+#define _Content_
+
+#include <AMQContentBody.h>
+#include <Buffer.h>
+#include <OutputHandler.h>
+#include <ProtocolVersion.h>
+
+namespace qpid {
+ namespace broker {
+ class Content{
+ public:
+ virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
+ virtual u_int32_t size() = 0;
+ virtual void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
+ virtual void encode(qpid::framing::Buffer& buffer) = 0;
+ virtual void destroy() = 0;
+ virtual ~Content(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/Daemon.cpp b/Final/cpp/lib/broker/Daemon.cpp
new file mode 100644
index 0000000000..3a0e687bf2
--- /dev/null
+++ b/Final/cpp/lib/broker/Daemon.cpp
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Daemon.h"
+#include "Exception.h"
+
+#include <boost/iostreams/stream.hpp>
+#include <boost/iostreams/device/file_descriptor.hpp>
+
+#include <sstream>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace qpid {
+namespace broker {
+
+using namespace std;
+typedef boost::iostreams::stream<boost::iostreams::file_descriptor> fdstream;
+
+namespace {
+/** Throw an exception containing msg and strerror if throwIf is true.
+ * Name is supposed to be reminiscent of perror().
+ */
+void terror(bool throwIf, const string& msg, int errNo=errno) {
+ if (throwIf)
+ throw Exception(msg + (errNo? ": "+strError(errNo) : string(".")));
+}
+
+
+struct LockFile : public fdstream {
+
+ LockFile(const std::string& path_, bool create)
+ : path(path_), fd(-1), created(create)
+ {
+ errno = 0;
+ int flags=create ? O_WRONLY|O_CREAT|O_NOFOLLOW : O_RDWR;
+ fd = ::open(path.c_str(), flags, 0644);
+ terror(fd < 0,"Cannot open "+path);
+ terror(::lockf(fd, F_TLOCK, 0) < 0, "Cannot lock "+path);
+ open(boost::iostreams::file_descriptor(fd));
+ }
+
+ ~LockFile() {
+ if (fd >= 0) {
+ ::lockf(fd, F_ULOCK, 0);
+ close();
+ }
+ }
+
+ std::string path;
+ int fd;
+ bool created;
+};
+
+} // namespace
+
+Daemon::Daemon() {
+ pid = -1;
+ pipeFds[0] = pipeFds[1] = -1;
+}
+
+string Daemon::dir() {
+ return (getuid() == 0 ? "/var/run" : "/tmp");
+}
+
+string Daemon::pidFile(uint16_t port) {
+ ostringstream path;
+ path << dir() << "/qpidd." << port << ".pid";
+ return path.str();
+}
+
+void Daemon::fork()
+{
+ terror(pipe(pipeFds) < 0, "Can't create pipe");
+ terror((pid = ::fork()) < 0, "Daemon fork failed");
+ if (pid == 0) { // Child
+ try {
+ // File descriptors
+ terror(::close(pipeFds[0])<0, "Cannot close read pipe");
+ terror(::close(0)<0, "Cannot close stdin");
+ terror(::close(1)<0, "Cannot close stdout");
+ terror(::close(2)<0, "Cannot close stderr");
+ int fd=::open("/dev/null",O_RDWR); // stdin
+ terror(fd != 0, "Cannot re-open stdin");
+ terror(::dup(fd)<0, "Cannot re-open stdout");
+ terror(::dup(fd)<0, "Cannot re-open stderror");
+
+ // Misc
+ terror(setsid()<0, "Cannot set session ID");
+ terror(chdir(dir().c_str()) < 0, "Cannot change directory to "+dir());
+ umask(027);
+
+ // Child behavior
+ child();
+ }
+ catch (const exception& e) {
+ fdstream pipe(pipeFds[1]);
+ assert(pipe.is_open());
+ pipe << "0 " << e.what() << endl;
+ }
+ }
+ else { // Parent
+ close(pipeFds[1]); // Write side.
+ parent();
+ }
+}
+
+Daemon::~Daemon() {
+ if (!lockFile.empty())
+ unlink(lockFile.c_str());
+}
+
+uint16_t Daemon::wait(int timeout) { // parent waits for child.
+ errno = 0;
+ struct timeval tv;
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(pipeFds[0], &fds);
+ terror(1 != select(FD_SETSIZE, &fds, 0, 0, &tv), "No response from daemon process");
+
+ fdstream pipe(pipeFds[0]);
+ uint16_t value = 0;
+ pipe >> value >> skipws;
+ if (value == 0) {
+ string errmsg;
+ getline(pipe, errmsg);
+ throw Exception("Daemon startup failed"+ (errmsg.empty() ? string(".") : ": " + errmsg));
+ }
+ return value;
+}
+
+void Daemon::ready(uint16_t port) { // child
+ lockFile = pidFile(port);
+ LockFile lf(lockFile, true);
+ lf << getpid() << endl;
+ if (lf.fail())
+ throw Exception("Cannot write lock file "+lockFile);
+ fdstream pipe(pipeFds[1]);
+ pipe << port << endl;;
+}
+
+pid_t Daemon::getPid(uint16_t port) {
+ string name = pidFile(port);
+ LockFile lockFile(name, false);
+ pid_t pid;
+ lockFile >> pid;
+ if (lockFile.fail())
+ throw Exception("Cannot read lock file "+name);
+ if (kill(pid, 0) < 0 && errno != EPERM) {
+ unlink(name.c_str());
+ throw Exception("Removing stale lock file "+name);
+ }
+ return pid;
+}
+
+
+}} // namespace qpid::broker
diff --git a/Final/cpp/lib/broker/Daemon.h b/Final/cpp/lib/broker/Daemon.h
new file mode 100644
index 0000000000..a9755bc8e7
--- /dev/null
+++ b/Final/cpp/lib/broker/Daemon.h
@@ -0,0 +1,84 @@
+#ifndef _broker_Daemon_h
+#define _broker_Daemon_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <boost/scoped_ptr.hpp>
+#include <boost/function.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Tools for forking and managing a daemon process.
+ * NB: Only one Daemon instance is allowed in a process.
+ */
+class Daemon : private boost::noncopyable
+{
+ public:
+ /** Check daemon is running on port, throw exception if not */
+ static pid_t getPid(uint16_t port);
+
+ Daemon();
+
+ virtual ~Daemon();
+
+ /**
+ * Fork a daemon process.
+ * Call parent() in the parent process, child() in the child.
+ */
+ void fork();
+
+ protected:
+
+ /** Called in parent process */
+ virtual void parent() = 0;
+
+ /** Called in child process */
+ virtual void child() = 0;
+
+ /** Call from parent(): wait for child to indicate it is ready.
+ * @timeout in seconds to wait for response.
+ * @return port passed by child to ready().
+ */
+ uint16_t wait(int timeout);
+
+ /** Call from child(): Notify the parent we are ready and write the
+ * PID file.
+ *@param port returned by parent call to wait().
+ */
+ void ready(uint16_t port);
+
+ private:
+ static std::string dir();
+ static std::string pidFile(uint16_t port);
+
+ pid_t pid;
+ int pipeFds[2];
+ std::string lockFile;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!_broker_Daemon_h*/
diff --git a/Final/cpp/lib/broker/DeletingTxOp.cpp b/Final/cpp/lib/broker/DeletingTxOp.cpp
new file mode 100644
index 0000000000..25fe9c98db
--- /dev/null
+++ b/Final/cpp/lib/broker/DeletingTxOp.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <DeletingTxOp.h>
+
+using namespace qpid::broker;
+
+DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
+
+bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){
+ return delegate && delegate->prepare(ctxt);
+}
+
+void DeletingTxOp::commit() throw(){
+ if(delegate){
+ delegate->commit();
+ delete delegate;
+ delegate = 0;
+ }
+}
+
+void DeletingTxOp::rollback() throw(){
+ if(delegate){
+ delegate->rollback();
+ delete delegate;
+ delegate = 0;
+ }
+}
diff --git a/Final/cpp/lib/broker/DeletingTxOp.h b/Final/cpp/lib/broker/DeletingTxOp.h
new file mode 100644
index 0000000000..3e026cd4ca
--- /dev/null
+++ b/Final/cpp/lib/broker/DeletingTxOp.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeletingTxOp_
+#define _DeletingTxOp_
+
+#include <TxOp.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * TxOp wrapper that will delegate calls & delete the object
+ * to which it delegates after completion of the transaction.
+ */
+ class DeletingTxOp : public virtual TxOp{
+ TxOp* delegate;
+ public:
+ DeletingTxOp(TxOp* const delegate);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~DeletingTxOp(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/Deliverable.h b/Final/cpp/lib/broker/Deliverable.h
new file mode 100644
index 0000000000..e33443555d
--- /dev/null
+++ b/Final/cpp/lib/broker/Deliverable.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Deliverable_
+#define _Deliverable_
+
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+ class Deliverable{
+ public:
+ virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+ virtual ~Deliverable(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/DeliverableMessage.cpp b/Final/cpp/lib/broker/DeliverableMessage.cpp
new file mode 100644
index 0000000000..b9c89da690
--- /dev/null
+++ b/Final/cpp/lib/broker/DeliverableMessage.cpp
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <DeliverableMessage.h>
+
+using namespace qpid::broker;
+
+DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
+{
+}
+
+void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
+{
+ queue->deliver(msg);
+}
+
diff --git a/Final/cpp/lib/broker/DeliverableMessage.h b/Final/cpp/lib/broker/DeliverableMessage.h
new file mode 100644
index 0000000000..962f0da640
--- /dev/null
+++ b/Final/cpp/lib/broker/DeliverableMessage.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliverableMessage_
+#define _DeliverableMessage_
+
+#include <Deliverable.h>
+#include <BrokerMessage.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+ class DeliverableMessage : public Deliverable{
+ Message::shared_ptr msg;
+ public:
+ DeliverableMessage(Message::shared_ptr& msg);
+ virtual void deliverTo(Queue::shared_ptr& queue);
+ virtual ~DeliverableMessage(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/DeliveryRecord.cpp b/Final/cpp/lib/broker/DeliveryRecord.cpp
new file mode 100644
index 0000000000..19b01cc312
--- /dev/null
+++ b/Final/cpp/lib/broker/DeliveryRecord.cpp
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <DeliveryRecord.h>
+#include <BrokerChannel.h>
+
+using namespace qpid::broker;
+using std::string;
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const string _consumerTag,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag),
+ pull(false){}
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(""),
+ deliveryTag(_deliveryTag),
+ pull(true){}
+
+
+void DeliveryRecord::discard(TransactionContext* ctxt, const std::string* const xid) const{
+ queue->dequeue(ctxt, msg, xid);
+}
+
+void DeliveryRecord::discard() const{
+ discard(0, 0);
+}
+
+bool DeliveryRecord::matches(u_int64_t tag) const{
+ return deliveryTag == tag;
+}
+
+bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+ return range->covers(deliveryTag);
+}
+
+void DeliveryRecord::redeliver(Channel* const channel) const{
+ if(pull){
+ //if message was originally sent as response to get, we must requeue it
+ requeue();
+ }else{
+ channel->deliver(msg, consumerTag, deliveryTag);
+ }
+}
+
+void DeliveryRecord::requeue() const{
+ msg->redeliver();
+ queue->process(msg);
+}
+
+void DeliveryRecord::addTo(Prefetch* const prefetch) const{
+ if(!pull){
+ //ignore 'pulled' messages (i.e. those that were sent in
+ //response to get) when calculating prefetch
+ prefetch->size += msg->contentSize();
+ prefetch->count++;
+ }
+}
+
+void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{
+ if(!pull){
+ //ignore 'pulled' messages (i.e. those that were sent in
+ //response to get) when calculating prefetch
+ prefetch->size -= msg->contentSize();
+ prefetch->count--;
+ }
+}
diff --git a/Final/cpp/lib/broker/DeliveryRecord.h b/Final/cpp/lib/broker/DeliveryRecord.h
new file mode 100644
index 0000000000..01a4024b28
--- /dev/null
+++ b/Final/cpp/lib/broker/DeliveryRecord.h
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliveryRecord_
+#define _DeliveryRecord_
+
+#include <algorithm>
+#include <list>
+#include <AccumulatedAck.h>
+#include <BrokerMessage.h>
+#include <Prefetch.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+ class Channel;
+
+ /**
+ * Record of a delivery for which an ack is outstanding.
+ */
+ class DeliveryRecord{
+ mutable Message::shared_ptr msg;
+ mutable Queue::shared_ptr queue;
+ std::string consumerTag;
+ u_int64_t deliveryTag;
+ bool pull;
+
+ public:
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const u_int64_t deliveryTag);
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag);
+
+ void discard() const;
+ void discard(TransactionContext* ctxt, const std::string* const xid) const;
+ bool matches(u_int64_t tag) const;
+ bool coveredBy(const AccumulatedAck* const range) const;
+ void requeue() const;
+ void redeliver(Channel* const) const;
+ void addTo(Prefetch* const prefetch) const;
+ void subtractFrom(Prefetch* const prefetch) const;
+ };
+
+ typedef std::list<DeliveryRecord>::iterator ack_iterator;
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/DirectExchange.cpp b/Final/cpp/lib/broker/DirectExchange.cpp
new file mode 100644
index 0000000000..c898ae8d7e
--- /dev/null
+++ b/Final/cpp/lib/broker/DirectExchange.cpp
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <DirectExchange.h>
+#include <ExchangeBinding.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
+
+}
+
+void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Mutex::ScopedLock l(lock);
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i == queues.end()){
+ bindings[routingKey].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+}
+
+void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+ Mutex::ScopedLock l(lock);
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i < queues.end()){
+ queues.erase(i);
+ if(queues.empty()){
+ bindings.erase(routingKey);
+ }
+ }
+}
+
+void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+ Mutex::ScopedLock l(lock);
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ int count(0);
+ for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
+ msg.deliverTo(*i);
+ }
+ if(!count){
+ std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
+ }
+}
+
+DirectExchange::~DirectExchange(){
+
+}
+
+
+const std::string DirectExchange::typeName("direct");
diff --git a/Final/cpp/lib/broker/DirectExchange.h b/Final/cpp/lib/broker/DirectExchange.h
new file mode 100644
index 0000000000..a7ef5aca9e
--- /dev/null
+++ b/Final/cpp/lib/broker/DirectExchange.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DirectExchange_
+#define _DirectExchange_
+
+#include <map>
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+ class DirectExchange : public virtual Exchange{
+ std::map<string, std::vector<Queue::shared_ptr> > bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ DirectExchange(const std::string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~DirectExchange();
+ };
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/ExchangeBinding.cpp b/Final/cpp/lib/broker/ExchangeBinding.cpp
new file mode 100644
index 0000000000..bf2102414d
--- /dev/null
+++ b/Final/cpp/lib/broker/ExchangeBinding.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ExchangeBinding.h>
+#include <BrokerExchange.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
+
+void ExchangeBinding::cancel(){
+ e->unbind(q, key, args);
+ delete this;
+}
+
+ExchangeBinding::~ExchangeBinding(){
+}
diff --git a/Final/cpp/lib/broker/ExchangeBinding.h b/Final/cpp/lib/broker/ExchangeBinding.h
new file mode 100644
index 0000000000..2afaa89552
--- /dev/null
+++ b/Final/cpp/lib/broker/ExchangeBinding.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ExchangeBinding_
+#define _ExchangeBinding_
+
+#include <Binding.h>
+#include <FieldTable.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+ class Exchange;
+ class Queue;
+
+ class ExchangeBinding : public virtual Binding{
+ Exchange* e;
+ Queue::shared_ptr q;
+ const string key;
+ const qpid::framing::FieldTable* args;
+ public:
+ ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const qpid::framing::FieldTable* _args);
+ virtual void cancel();
+ virtual ~ExchangeBinding();
+ };
+ }
+}
+
+
+#endif
+
diff --git a/Final/cpp/lib/broker/ExchangeRegistry.cpp b/Final/cpp/lib/broker/ExchangeRegistry.cpp
new file mode 100644
index 0000000000..7bf96c4544
--- /dev/null
+++ b/Final/cpp/lib/broker/ExchangeRegistry.cpp
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ExchangeRegistry.h>
+#include <DirectExchange.h>
+#include <FanOutExchange.h>
+#include <HeadersExchange.h>
+#include <TopicExchange.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using std::pair;
+
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
+ Mutex::ScopedLock locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i == exchanges.end()) {
+ Exchange::shared_ptr exchange;
+
+ if(type == TopicExchange::typeName){
+ exchange = Exchange::shared_ptr(new TopicExchange(name));
+ }else if(type == DirectExchange::typeName){
+ exchange = Exchange::shared_ptr(new DirectExchange(name));
+ }else if(type == FanOutExchange::typeName){
+ exchange = Exchange::shared_ptr(new FanOutExchange(name));
+ }else if (type == HeadersExchange::typeName) {
+ exchange = Exchange::shared_ptr(new HeadersExchange(name));
+ }else{
+ throw UnknownExchangeTypeException();
+ }
+ exchanges[name] = exchange;
+ return std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ } else {
+ return std::pair<Exchange::shared_ptr, bool>(i->second, false);
+ }
+}
+
+void ExchangeRegistry::destroy(const string& name){
+ Mutex::ScopedLock locker(lock);
+ exchanges.erase(name);
+}
+
+Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+ Mutex::ScopedLock locker(lock);
+ return exchanges[name];
+}
+
+namespace
+{
+const std::string empty;
+}
+
+Exchange::shared_ptr ExchangeRegistry::getDefault()
+{
+ return get(empty);
+}
diff --git a/Final/cpp/lib/broker/ExchangeRegistry.h b/Final/cpp/lib/broker/ExchangeRegistry.h
new file mode 100644
index 0000000000..8dcd0d3623
--- /dev/null
+++ b/Final/cpp/lib/broker/ExchangeRegistry.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ExchangeRegistry_
+#define _ExchangeRegistry_
+
+#include <map>
+#include <BrokerExchange.h>
+#include <sys/Monitor.h>
+
+namespace qpid {
+namespace broker {
+ struct UnknownExchangeTypeException{};
+
+ class ExchangeRegistry{
+ typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
+ ExchangeMap exchanges;
+ qpid::sys::Mutex lock;
+ public:
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException);
+ void destroy(const std::string& name);
+ Exchange::shared_ptr get(const std::string& name);
+ Exchange::shared_ptr getDefault();
+ };
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/FanOutExchange.cpp b/Final/cpp/lib/broker/FanOutExchange.cpp
new file mode 100644
index 0000000000..48afcc20d5
--- /dev/null
+++ b/Final/cpp/lib/broker/FanOutExchange.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <FanOutExchange.h>
+#include <ExchangeBinding.h>
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
+
+void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Mutex::ScopedLock locker(lock);
+ // Add if not already present.
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i == bindings.end()) {
+ bindings.push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+}
+
+void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
+ Mutex::ScopedLock locker(lock);
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i != bindings.end()) {
+ bindings.erase(i);
+ // TODO aconway 2006-09-14: What about the ExchangeBinding object?
+ // Don't we have to verify routingKey/args match?
+ }
+}
+
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+ Mutex::ScopedLock locker(lock);
+ for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
+ msg.deliverTo(*i);
+ }
+}
+
+FanOutExchange::~FanOutExchange() {}
+
+const std::string FanOutExchange::typeName("fanout");
diff --git a/Final/cpp/lib/broker/FanOutExchange.h b/Final/cpp/lib/broker/FanOutExchange.h
new file mode 100644
index 0000000000..6dc70e69bb
--- /dev/null
+++ b/Final/cpp/lib/broker/FanOutExchange.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _FanOutExchange_
+#define _FanOutExchange_
+
+#include <map>
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+class FanOutExchange : public virtual Exchange {
+ std::vector<Queue::shared_ptr> bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ FanOutExchange(const std::string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~FanOutExchange();
+};
+
+}
+}
+
+
+
+#endif
diff --git a/Final/cpp/lib/broker/HeadersExchange.cpp b/Final/cpp/lib/broker/HeadersExchange.cpp
new file mode 100644
index 0000000000..acd344725a
--- /dev/null
+++ b/Final/cpp/lib/broker/HeadersExchange.cpp
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <HeadersExchange.h>
+#include <ExchangeBinding.h>
+#include <Value.h>
+#include <QpidError.h>
+#include <algorithm>
+
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// The current search algorithm really sucks.
+// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
+
+using namespace qpid::broker;
+
+namespace {
+ const std::string all("all");
+ const std::string any("any");
+ const std::string x_match("x-match");
+}
+
+HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
+
+void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Mutex::ScopedLock locker(lock);
+ std::string what = args->getString("x-match");
+ if (what != all && what != any) {
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
+ }
+ bindings.push_back(Binding(*args, queue));
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
+ Mutex::ScopedLock locker(lock);
+ Bindings::iterator i =
+ std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
+ if (i != bindings.end()) bindings.erase(i);
+}
+
+
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
+ Mutex::ScopedLock locker(lock);;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, *args)) msg.deliverTo(i->second);
+ }
+}
+
+HeadersExchange::~HeadersExchange() {}
+
+const std::string HeadersExchange::typeName("headers");
+
+namespace
+{
+
+ bool match_values(const Value& bind, const Value& msg) {
+ return dynamic_cast<const EmptyValue*>(&bind) || bind == msg;
+ }
+
+}
+
+
+bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
+ typedef FieldTable::ValueMap Map;
+ std::string what = bind.getString(x_match);
+ if (what == all) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j == msg.getMap().end()) return false;
+ if (!match_values(*(i->second), *(j->second))) return false;
+ }
+ }
+ return true;
+ } else if (what == any) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j != msg.getMap().end()) {
+ if (match_values(*(i->second), *(j->second))) return true;
+ }
+ }
+ }
+ return false;
+ } else {
+ return false;
+ }
+}
+
+
+
diff --git a/Final/cpp/lib/broker/HeadersExchange.h b/Final/cpp/lib/broker/HeadersExchange.h
new file mode 100644
index 0000000000..5e8da5ad85
--- /dev/null
+++ b/Final/cpp/lib/broker/HeadersExchange.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _HeadersExchange_
+#define _HeadersExchange_
+
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+
+class HeadersExchange : public virtual Exchange {
+ typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
+ typedef std::vector<Binding> Bindings;
+
+ Bindings bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ HeadersExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~HeadersExchange();
+
+ static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+};
+
+
+
+}
+}
+
+#endif
diff --git a/Final/cpp/lib/broker/InMemoryContent.cpp b/Final/cpp/lib/broker/InMemoryContent.cpp
new file mode 100644
index 0000000000..07af8633e5
--- /dev/null
+++ b/Final/cpp/lib/broker/InMemoryContent.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <InMemoryContent.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using boost::static_pointer_cast;
+
+void InMemoryContent::add(AMQContentBody::shared_ptr data)
+{
+ content.push_back(data);
+}
+
+u_int32_t InMemoryContent::size()
+{
+ int sum(0);
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ sum += (*i)->size();
+ }
+ return sum;
+}
+
+void InMemoryContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+{
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ if ((*i)->size() > framesize) {
+ u_int32_t offset = 0;
+ for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
+ string data = (*i)->getData().substr(offset, framesize);
+ out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ offset += framesize;
+ }
+ u_int32_t remainder = (*i)->size() % framesize;
+ if (remainder) {
+ string data = (*i)->getData().substr(offset, remainder);
+ out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ }
+ } else {
+ AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ out->send(new AMQFrame(version, channel, contentBody));
+ }
+ }
+}
+
+void InMemoryContent::encode(Buffer& buffer)
+{
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ (*i)->encode(buffer);
+ }
+}
+
+void InMemoryContent::destroy()
+{
+}
diff --git a/Final/cpp/lib/broker/InMemoryContent.h b/Final/cpp/lib/broker/InMemoryContent.h
new file mode 100644
index 0000000000..1db1acd7e1
--- /dev/null
+++ b/Final/cpp/lib/broker/InMemoryContent.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _InMemoryContent_
+#define _InMemoryContent_
+
+#include <Content.h>
+#include <vector>
+
+
+namespace qpid {
+ namespace broker {
+ class InMemoryContent : public Content{
+ typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef content_list::iterator content_iterator;
+
+ content_list content;
+ public:
+ void add(qpid::framing::AMQContentBody::shared_ptr data);
+ u_int32_t size();
+ void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void encode(qpid::framing::Buffer& buffer);
+ void destroy();
+ ~InMemoryContent(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/LazyLoadedContent.cpp b/Final/cpp/lib/broker/LazyLoadedContent.cpp
new file mode 100644
index 0000000000..ec1ca3e195
--- /dev/null
+++ b/Final/cpp/lib/broker/LazyLoadedContent.cpp
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <LazyLoadedContent.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, u_int64_t _expectedSize) :
+ store(_store), msg(_msg), expectedSize(_expectedSize) {}
+
+void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
+{
+ store->appendContent(msg, data->getData());
+}
+
+u_int32_t LazyLoadedContent::size()
+{
+ return 0;//all content is written as soon as it is added
+}
+
+void LazyLoadedContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+{
+ if (expectedSize > framesize) {
+ for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {
+ u_int64_t remaining = expectedSize - offset;
+ string data;
+ store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining);
+ out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ }
+ } else {
+ string data;
+ store->loadContent(msg, data, 0, expectedSize);
+ out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ }
+}
+
+void LazyLoadedContent::encode(Buffer&)
+{
+ //do nothing as all content is written as soon as it is added
+}
+
+void LazyLoadedContent::destroy()
+{
+ store->destroy(msg);
+}
diff --git a/Final/cpp/lib/broker/LazyLoadedContent.h b/Final/cpp/lib/broker/LazyLoadedContent.h
new file mode 100644
index 0000000000..80f8cce4eb
--- /dev/null
+++ b/Final/cpp/lib/broker/LazyLoadedContent.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LazyLoadedContent_
+#define _LazyLoadedContent_
+
+#include <Content.h>
+#include <MessageStore.h>
+
+namespace qpid {
+ namespace broker {
+ class LazyLoadedContent : public Content{
+ MessageStore* const store;
+ Message* const msg;
+ const u_int64_t expectedSize;
+ public:
+ LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize);
+ void add(qpid::framing::AMQContentBody::shared_ptr data);
+ u_int32_t size();
+ void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void encode(qpid::framing::Buffer& buffer);
+ void destroy();
+ ~LazyLoadedContent(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/Makefile.am b/Final/cpp/lib/broker/Makefile.am
new file mode 100644
index 0000000000..1a3fc79dea
--- /dev/null
+++ b/Final/cpp/lib/broker/Makefile.am
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+AM_CXXFLAGS = $(WARNING_CFLAGS)
+INCLUDES = \
+ -I$(top_srcdir)/gen \
+ -I$(top_srcdir)/lib/common \
+ -I$(top_srcdir)/lib/common/sys \
+ -I$(top_srcdir)/lib/common/framing \
+ $(APR_CXXFLAGS)
+
+lib_LTLIBRARIES = libqpidbroker.la
+libqpidbroker_la_LIBADD = \
+ ../common/libqpidcommon.la \
+ -lboost_iostreams
+
+libqpidbroker_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
+libqpidbroker_la_SOURCES = \
+ AccumulatedAck.cpp \
+ AccumulatedAck.h \
+ AutoDelete.cpp \
+ AutoDelete.h \
+ Binding.h \
+ Broker.cpp \
+ Broker.h \
+ BrokerChannel.cpp \
+ BrokerChannel.h \
+ BrokerExchange.h \
+ BrokerMessage.cpp \
+ BrokerMessage.h \
+ BrokerQueue.cpp \
+ BrokerQueue.h \
+ ConnectionToken.h \
+ Consumer.h \
+ Content.h \
+ Daemon.cpp \
+ Daemon.h \
+ DeletingTxOp.cpp \
+ DeletingTxOp.h \
+ Deliverable.h \
+ DeliverableMessage.cpp \
+ DeliverableMessage.h \
+ DeliveryRecord.cpp \
+ DeliveryRecord.h \
+ DirectExchange.cpp \
+ DirectExchange.h \
+ ExchangeBinding.cpp \
+ ExchangeBinding.h \
+ ExchangeRegistry.cpp \
+ ExchangeRegistry.h \
+ FanOutExchange.cpp \
+ FanOutExchange.h \
+ HeadersExchange.cpp \
+ HeadersExchange.h \
+ InMemoryContent.cpp \
+ InMemoryContent.h \
+ LazyLoadedContent.cpp \
+ LazyLoadedContent.h \
+ MessageBuilder.cpp \
+ MessageBuilder.h \
+ MessageStore.h \
+ MessageStoreModule.cpp \
+ MessageStoreModule.h \
+ NameGenerator.cpp \
+ NameGenerator.h \
+ NullMessageStore.cpp \
+ NullMessageStore.h \
+ Prefetch.h \
+ QueuePolicy.cpp \
+ QueuePolicy.h \
+ QueueRegistry.cpp \
+ QueueRegistry.h \
+ RecoveryManager.cpp \
+ RecoveryManager.h \
+ SessionHandlerFactoryImpl.cpp \
+ SessionHandlerFactoryImpl.h \
+ SessionHandlerImpl.cpp \
+ SessionHandlerImpl.h \
+ TopicExchange.cpp \
+ TopicExchange.h \
+ TransactionalStore.h \
+ TxAck.cpp \
+ TxAck.h \
+ TxBuffer.cpp \
+ TxBuffer.h \
+ TxOp.h \
+ TxPublish.cpp \
+ TxPublish.h
+
+
+# Force build during dist phase so help2man will work.
+dist-hook: $(lib_LTLIBRARIES)
diff --git a/Final/cpp/lib/broker/MessageBuilder.cpp b/Final/cpp/lib/broker/MessageBuilder.cpp
new file mode 100644
index 0000000000..41bf812d2d
--- /dev/null
+++ b/Final/cpp/lib/broker/MessageBuilder.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <MessageBuilder.h>
+
+#include <InMemoryContent.h>
+#include <LazyLoadedContent.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using std::auto_ptr;
+
+MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) :
+ handler(_handler),
+ store(_store),
+ stagingThreshold(_stagingThreshold)
+{}
+
+void MessageBuilder::route(){
+ if (message->isComplete()) {
+ if (handler) handler->complete(message);
+ message.reset();
+ }
+}
+
+void MessageBuilder::initialise(Message::shared_ptr& msg){
+ if(message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+ }
+ message = msg;
+}
+
+void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
+ }
+ message->setHeader(header);
+ if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
+ store->stage(message.get());
+ message->releaseContent(store);
+ } else {
+ auto_ptr<Content> content(new InMemoryContent());
+ message->setContent(content);
+ }
+ route();
+}
+
+void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
+ }
+ message->addContent(content);
+ route();
+}
diff --git a/Final/cpp/lib/broker/MessageBuilder.h b/Final/cpp/lib/broker/MessageBuilder.h
new file mode 100644
index 0000000000..4e51f223f0
--- /dev/null
+++ b/Final/cpp/lib/broker/MessageBuilder.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageBuilder_
+#define _MessageBuilder_
+
+#include <memory>
+#include <QpidError.h>
+#include <BrokerExchange.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <AMQContentBody.h>
+#include <AMQHeaderBody.h>
+#include <BasicPublishBody.h>
+
+namespace qpid {
+ namespace broker {
+ class MessageBuilder{
+ public:
+ class CompletionHandler{
+ public:
+ virtual void complete(Message::shared_ptr&) = 0;
+ virtual ~CompletionHandler(){}
+ };
+ MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0);
+ void initialise(Message::shared_ptr& msg);
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
+ private:
+ Message::shared_ptr message;
+ CompletionHandler* handler;
+ MessageStore* const store;
+ const u_int64_t stagingThreshold;
+
+ void route();
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/MessageStore.h b/Final/cpp/lib/broker/MessageStore.h
new file mode 100644
index 0000000000..938f807a67
--- /dev/null
+++ b/Final/cpp/lib/broker/MessageStore.h
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStore_
+#define _MessageStore_
+
+#include <BrokerMessage.h>
+#include <FieldTable.h>
+#include <RecoveryManager.h>
+#include <TransactionalStore.h>
+
+namespace qpid {
+ namespace broker {
+ struct MessageStoreSettings
+ {
+ /**
+ * Messages whose content length is larger than this value
+ * will be staged (i.e. will have thier data written to
+ * disk as it arrives) and will load their data lazily. On
+ * recovery therefore, only the headers should be loaded.
+ */
+ u_int64_t stagingThreshold;
+ };
+ /**
+ * An abstraction of the persistent storage for messages. (In
+ * all methods, any pointers/references to queues or messages
+ * are valid only for the duration of the call).
+ */
+ class MessageStore : public TransactionalStore{
+ public:
+ /**
+ * Record the existance of a durable queue
+ */
+ virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0;
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(const Queue& queue) = 0;
+
+ /**
+ * Request recovery of queue and message state from store
+ */
+ virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0;
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(Message* const msg) = 0;
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(Message* const msg) = 0;
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(Message* const msg, const std::string& data) = 0;
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length) = 0;
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of th queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
+
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being prepared.
+ */
+ virtual void prepared(const std::string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being committed.
+ */
+ virtual void committed(const std::string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being aborted.
+ */
+ virtual void aborted(const std::string * const xid) = 0;
+
+ virtual ~MessageStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/MessageStoreModule.cpp b/Final/cpp/lib/broker/MessageStoreModule.cpp
new file mode 100644
index 0000000000..ccc5501379
--- /dev/null
+++ b/Final/cpp/lib/broker/MessageStoreModule.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <MessageStoreModule.h>
+#include <iostream>
+
+using namespace qpid::broker;
+
+MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
+{
+}
+
+void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings)
+{
+ store->create(queue, settings);
+}
+
+void MessageStoreModule::destroy(const Queue& queue)
+{
+ store->destroy(queue);
+}
+
+void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings)
+{
+ store->recover(registry, settings);
+}
+
+void MessageStoreModule::stage(Message* const msg)
+{
+ store->stage(msg);
+}
+
+void MessageStoreModule::destroy(Message* const msg)
+{
+ store->destroy(msg);
+}
+
+void MessageStoreModule::appendContent(Message* const msg, const std::string& data)
+{
+ store->appendContent(msg, data);
+}
+
+void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t offset, u_int32_t length)
+{
+ store->loadContent(msg, data, offset, length);
+}
+
+void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
+{
+ store->enqueue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
+{
+ store->dequeue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::prepared(const string * const xid)
+{
+ store->prepared(xid);
+}
+
+void MessageStoreModule::committed(const string * const xid)
+{
+ store->committed(xid);
+}
+
+void MessageStoreModule::aborted(const string * const xid)
+{
+ store->aborted(xid);
+}
+
+std::auto_ptr<TransactionContext> MessageStoreModule::begin()
+{
+ return store->begin();
+}
+
+void MessageStoreModule::commit(TransactionContext* ctxt)
+{
+ store->commit(ctxt);
+}
+
+void MessageStoreModule::abort(TransactionContext* ctxt)
+{
+ store->abort(ctxt);
+}
diff --git a/Final/cpp/lib/broker/MessageStoreModule.h b/Final/cpp/lib/broker/MessageStoreModule.h
new file mode 100644
index 0000000000..c49e06efa1
--- /dev/null
+++ b/Final/cpp/lib/broker/MessageStoreModule.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStoreModule_
+#define _MessageStoreModule_
+
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+#include <RecoveryManager.h>
+#include <sys/Module.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class MessageStoreModule : public MessageStore{
+ qpid::sys::Module<MessageStore> store;
+ public:
+ MessageStoreModule(const std::string& name);
+ void create(const Queue& queue, const qpid::framing::FieldTable& settings);
+ void destroy(const Queue& queue);
+ void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
+ void stage(Message* const msg);
+ void destroy(Message* const msg);
+ void appendContent(Message* const msg, const std::string& data);
+ void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
+ void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ void prepared(const std::string * const xid);
+ void committed(const std::string * const xid);
+ void aborted(const std::string * const xid);
+ std::auto_ptr<TransactionContext> begin();
+ void commit(TransactionContext* ctxt);
+ void abort(TransactionContext* ctxt);
+ ~MessageStoreModule(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/NameGenerator.cpp b/Final/cpp/lib/broker/NameGenerator.cpp
new file mode 100644
index 0000000000..3f281859fa
--- /dev/null
+++ b/Final/cpp/lib/broker/NameGenerator.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <NameGenerator.h>
+#include <sstream>
+
+using namespace qpid::broker;
+
+NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
+
+std::string NameGenerator::generate(){
+ std::stringstream ss;
+ ss << base << counter++;
+ return ss.str();
+}
diff --git a/Final/cpp/lib/broker/NameGenerator.h b/Final/cpp/lib/broker/NameGenerator.h
new file mode 100644
index 0000000000..b2dbbdfb69
--- /dev/null
+++ b/Final/cpp/lib/broker/NameGenerator.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _NameGenerator_
+#define _NameGenerator_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+ namespace broker {
+ class NameGenerator{
+ const std::string base;
+ unsigned int counter;
+ public:
+ NameGenerator(const std::string& base);
+ std::string generate();
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/NullMessageStore.cpp b/Final/cpp/lib/broker/NullMessageStore.cpp
new file mode 100644
index 0000000000..dd58539925
--- /dev/null
+++ b/Final/cpp/lib/broker/NullMessageStore.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <NullMessageStore.h>
+
+#include <BrokerQueue.h>
+#include <RecoveryManager.h>
+
+#include <iostream>
+
+using namespace qpid::broker;
+
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+
+void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&)
+{
+ if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::destroy(const Queue& queue)
+{
+ if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const)
+{
+ if (warn) std::cout << "Persistence not enabled, no recovery attempted." << std::endl;
+}
+
+void NullMessageStore::stage(Message* const)
+{
+ if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::destroy(Message* const)
+{
+ if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::appendContent(Message* const, const string&)
+{
+ if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t)
+{
+ if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const)
+{
+ if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const)
+{
+ if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::prepared(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::committed(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::aborted(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+std::auto_ptr<TransactionContext> NullMessageStore::begin()
+{
+ return std::auto_ptr<TransactionContext>();
+}
+
+void NullMessageStore::commit(TransactionContext*)
+{
+}
+
+void NullMessageStore::abort(TransactionContext*)
+{
+}
diff --git a/Final/cpp/lib/broker/NullMessageStore.h b/Final/cpp/lib/broker/NullMessageStore.h
new file mode 100644
index 0000000000..ef2bea8fd6
--- /dev/null
+++ b/Final/cpp/lib/broker/NullMessageStore.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _NullMessageStore_
+#define _NullMessageStore_
+
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class NullMessageStore : public MessageStore{
+ const bool warn;
+ public:
+ NullMessageStore(bool warn = true);
+ virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
+ virtual void destroy(const Queue& queue);
+ virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
+ virtual void stage(Message* const msg);
+ virtual void destroy(Message* const msg);
+ virtual void appendContent(Message* const msg, const std::string& data);
+ virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
+ virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ virtual void prepared(const std::string * const xid);
+ virtual void committed(const std::string * const xid);
+ virtual void aborted(const std::string * const xid);
+ virtual std::auto_ptr<TransactionContext> begin();
+ virtual void commit(TransactionContext* ctxt);
+ virtual void abort(TransactionContext* ctxt);
+ ~NullMessageStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/Prefetch.h b/Final/cpp/lib/broker/Prefetch.h
new file mode 100644
index 0000000000..a1adccaee7
--- /dev/null
+++ b/Final/cpp/lib/broker/Prefetch.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Prefetch_
+#define _Prefetch_
+
+#include <amqp_types.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Count and total size of asynchronously delivered
+ * (i.e. pushed) messages that have acks outstanding.
+ */
+ struct Prefetch{
+ u_int32_t size;
+ u_int16_t count;
+
+ void reset() { size = 0; count = 0; }
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/QueuePolicy.cpp b/Final/cpp/lib/broker/QueuePolicy.cpp
new file mode 100644
index 0000000000..e13fd62fc6
--- /dev/null
+++ b/Final/cpp/lib/broker/QueuePolicy.cpp
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueuePolicy.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) :
+ maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
+
+QueuePolicy::QueuePolicy(const FieldTable& settings) :
+ maxCount(getInt(settings, maxCountKey, 0)),
+ maxSize(getInt(settings, maxSizeKey, 0)), count(0), size(0) {}
+
+void QueuePolicy::enqueued(u_int64_t _size)
+{
+ if (maxCount) count++;
+ if (maxSize) size += _size;
+}
+
+void QueuePolicy::dequeued(u_int64_t _size)
+{
+ if (maxCount) count--;
+ if (maxSize) size -= _size;
+}
+
+bool QueuePolicy::limitExceeded()
+{
+ return (maxSize && size > maxSize) || (maxCount && count > maxCount);
+}
+
+void QueuePolicy::update(FieldTable& settings)
+{
+ if (maxCount) settings.setInt(maxCountKey, maxCount);
+ if (maxSize) settings.setInt(maxSizeKey, maxSize);
+}
+
+
+int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int defaultValue)
+{
+ //Note: currently field table only contain signed 32 bit ints, which
+ // restricts the values that can be set on the queue policy.
+ try {
+ return settings.getInt(key);
+ } catch (FieldNotFoundException& ignore) {
+ return defaultValue;
+ }
+}
+
+const std::string QueuePolicy::maxCountKey("qpid.max_count");
+const std::string QueuePolicy::maxSizeKey("qpid.max_size");
diff --git a/Final/cpp/lib/broker/QueuePolicy.h b/Final/cpp/lib/broker/QueuePolicy.h
new file mode 100644
index 0000000000..597cfe7ce8
--- /dev/null
+++ b/Final/cpp/lib/broker/QueuePolicy.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueuePolicy_
+#define _QueuePolicy_
+
+#include <FieldTable.h>
+
+namespace qpid {
+ namespace broker {
+ class QueuePolicy
+ {
+ static const std::string maxCountKey;
+ static const std::string maxSizeKey;
+
+ const u_int32_t maxCount;
+ const u_int64_t maxSize;
+ u_int32_t count;
+ u_int64_t size;
+
+ static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
+
+ public:
+ QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
+ QueuePolicy(const qpid::framing::FieldTable& settings);
+ void enqueued(u_int64_t size);
+ void dequeued(u_int64_t size);
+ void update(qpid::framing::FieldTable& settings);
+ bool limitExceeded();
+ u_int32_t getMaxCount() const { return maxCount; }
+ u_int64_t getMaxSize() const { return maxSize; }
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/QueueRegistry.cpp b/Final/cpp/lib/broker/QueueRegistry.cpp
new file mode 100644
index 0000000000..c69d553b06
--- /dev/null
+++ b/Final/cpp/lib/broker/QueueRegistry.cpp
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueueRegistry.h>
+#include <SessionHandlerImpl.h>
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
+
+QueueRegistry::~QueueRegistry()
+{
+}
+
+std::pair<Queue::shared_ptr, bool>
+QueueRegistry::declare(const string& declareName, bool durable,
+ u_int32_t autoDelete, const ConnectionToken* owner)
+{
+ Mutex::ScopedLock locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
+ queues[name] = queue;
+ return std::pair<Queue::shared_ptr, bool>(queue, true);
+ } else {
+ return std::pair<Queue::shared_ptr, bool>(i->second, false);
+ }
+}
+
+void QueueRegistry::destroy(const string& name){
+ Mutex::ScopedLock locker(lock);
+ queues.erase(name);
+}
+
+Queue::shared_ptr QueueRegistry::find(const string& name){
+ Mutex::ScopedLock locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ return Queue::shared_ptr();
+ } else {
+ return i->second;
+ }
+}
+
+string QueueRegistry::generateName(){
+ string name;
+ do {
+ std::stringstream ss;
+ ss << "tmp_" << counter++;
+ name = ss.str();
+ // Thread safety: Private function, only called with lock held
+ // so this is OK.
+ } while(queues.find(name) != queues.end());
+ return name;
+}
+
+MessageStore* const QueueRegistry::getStore() const {
+ return store;
+}
diff --git a/Final/cpp/lib/broker/QueueRegistry.h b/Final/cpp/lib/broker/QueueRegistry.h
new file mode 100644
index 0000000000..7232024675
--- /dev/null
+++ b/Final/cpp/lib/broker/QueueRegistry.h
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueueRegistry_
+#define _QueueRegistry_
+
+#include <map>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A registry of queues indexed by queue name.
+ *
+ * Queues are reference counted using shared_ptr to ensure that they
+ * are deleted when and only when they are no longer in use.
+ *
+ */
+class QueueRegistry{
+
+ public:
+ QueueRegistry(MessageStore* const store = 0);
+ ~QueueRegistry();
+
+ /**
+ * Declare a queue.
+ *
+ * @return The queue and a boolean flag which is true if the queue
+ * was created by this declare call false if it already existed.
+ */
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0,
+ const ConnectionToken* const owner = 0);
+
+ /**
+ * Destroy the named queue.
+ *
+ * Note: if the queue is in use it is not actually destroyed until
+ * all shared_ptrs to it are destroyed. During that time it is
+ * possible that a new queue with the same name may be
+ * created. This should not create any problems as the new and
+ * old queues exist independently. The registry has
+ * forgotten the old queue so there can be no confusion for
+ * subsequent calls to find or declare with the same name.
+ *
+ */
+ void destroy(const string& name);
+
+ /**
+ * Find the named queue. Return 0 if not found.
+ */
+ Queue::shared_ptr find(const string& name);
+
+ /**
+ * Generate unique queue name.
+ */
+ string generateName();
+
+ /**
+ * Return the message store used.
+ */
+ MessageStore* const getStore() const;
+
+
+ private:
+ typedef std::map<string, Queue::shared_ptr> QueueMap;
+ QueueMap queues;
+ qpid::sys::Mutex lock;
+ int counter;
+ MessageStore* const store;
+};
+
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/RecoveryManager.cpp b/Final/cpp/lib/broker/RecoveryManager.cpp
new file mode 100644
index 0000000000..6ea4c00c65
--- /dev/null
+++ b/Final/cpp/lib/broker/RecoveryManager.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <RecoveryManager.h>
+
+using namespace qpid::broker;
+
+RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {}
+
+RecoveryManager::~RecoveryManager() {}
+
+Queue::shared_ptr RecoveryManager::recoverQueue(const string& name)
+{
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ Exchange::shared_ptr exchange = exchanges.getDefault();
+ if (exchange) {
+ exchange->bind(result.first, result.first->getName(), 0);
+ }
+ return result.first;
+}
+
+Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type)
+{
+ return exchanges.declare(name, type).first;
+}
diff --git a/Final/cpp/lib/broker/RecoveryManager.h b/Final/cpp/lib/broker/RecoveryManager.h
new file mode 100644
index 0000000000..d4e4cff3fd
--- /dev/null
+++ b/Final/cpp/lib/broker/RecoveryManager.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveryManager_
+#define _RecoveryManager_
+
+#include <ExchangeRegistry.h>
+#include <QueueRegistry.h>
+
+namespace qpid {
+namespace broker {
+
+ class RecoveryManager{
+ QueueRegistry& queues;
+ ExchangeRegistry& exchanges;
+ public:
+ RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges);
+ ~RecoveryManager();
+ Queue::shared_ptr recoverQueue(const std::string& name);
+ Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type);
+ };
+
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/Final/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
new file mode 100644
index 0000000000..1b5441e3cf
--- /dev/null
+++ b/Final/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <SessionHandlerFactoryImpl.h>
+
+#include <DirectExchange.h>
+#include <FanOutExchange.h>
+#include <HeadersExchange.h>
+#include <MessageStoreModule.h>
+#include <NullMessageStore.h>
+#include <SessionHandlerImpl.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+namespace
+{
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+}
+
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) :
+ store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)),
+ queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10)
+{
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ exchanges.declare(amq_direct, DirectExchange::typeName);
+ exchanges.declare(amq_topic, TopicExchange::typeName);
+ exchanges.declare(amq_fanout, FanOutExchange::typeName);
+ exchanges.declare(amq_match, HeadersExchange::typeName);
+
+ if(store.get()) {
+ RecoveryManager recoverer(queues, exchanges);
+ MessageStoreSettings storeSettings = { settings.stagingThreshold };
+ store->recover(recoverer, &storeSettings);
+ }
+
+ cleaner.start();
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+{
+ return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings);
+}
+
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+{
+ cleaner.stop();
+}
diff --git a/Final/cpp/lib/broker/SessionHandlerFactoryImpl.h b/Final/cpp/lib/broker/SessionHandlerFactoryImpl.h
new file mode 100644
index 0000000000..a69b67b08d
--- /dev/null
+++ b/Final/cpp/lib/broker/SessionHandlerFactoryImpl.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandlerFactoryImpl_
+#define _SessionHandlerFactoryImpl_
+
+#include <AutoDelete.h>
+#include <ExchangeRegistry.h>
+#include <MessageStore.h>
+#include <QueueRegistry.h>
+#include <AMQFrame.h>
+#include <ProtocolInitiation.h>
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/SessionHandlerFactory.h>
+#include <sys/TimeoutHandler.h>
+#include <SessionHandlerImpl.h>
+#include <memory>
+
+namespace qpid {
+ namespace broker {
+
+ class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory
+ {
+ std::auto_ptr<MessageStore> store;
+ QueueRegistry queues;
+ ExchangeRegistry exchanges;
+ const Settings settings;
+ AutoDelete cleaner;
+ public:
+ SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000);
+ virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
+ virtual ~SessionHandlerFactoryImpl();
+ };
+
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/SessionHandlerImpl.cpp b/Final/cpp/lib/broker/SessionHandlerImpl.cpp
new file mode 100644
index 0000000000..b23432e29d
--- /dev/null
+++ b/Final/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -0,0 +1,468 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <sstream>
+#include <SessionHandlerImpl.h>
+#include <FanOutExchange.h>
+#include <HeadersExchange.h>
+#include <TopicExchange.h>
+#include "assert.h"
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
+ QueueRegistry* _queues,
+ ExchangeRegistry* _exchanges,
+ AutoDelete* _cleaner,
+ const Settings& _settings) :
+ context(_context),
+ queues(_queues),
+ exchanges(_exchanges),
+ cleaner(_cleaner),
+ settings(_settings),
+ basicHandler(new BasicHandlerImpl(this)),
+ channelHandler(new ChannelHandlerImpl(this)),
+ connectionHandler(new ConnectionHandlerImpl(this)),
+ exchangeHandler(new ExchangeHandlerImpl(this)),
+ queueHandler(new QueueHandlerImpl(this)),
+ txHandler(new TxHandlerImpl(this)),
+ framemax(65536),
+ heartbeat(0)
+ {
+
+ client =NULL;
+}
+
+SessionHandlerImpl::~SessionHandlerImpl(){
+
+ if (client != NULL)
+ delete client;
+
+}
+
+Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
+ channel_iterator i = channels.find(channel);
+ if(i == channels.end()){
+ std::stringstream out;
+ out << "Unknown channel: " << channel;
+ throw ConnectionException(504, out.str());
+ }
+ return i->second;
+}
+
+Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = getChannel(channel)->getDefaultQueue();
+ if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+ } else {
+ queue = queues->find(name);
+ if (queue == 0) {
+ throw ChannelException( 404, "Queue not found: " + name);
+ }
+ }
+ return queue;
+}
+
+
+Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
+ return exchanges->get(name);
+}
+
+void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
+ u_int16_t channel = frame->getChannel();
+ AMQBody::shared_ptr body = frame->getBody();
+ AMQMethodBody::shared_ptr method;
+
+ switch(body->type())
+ {
+ case METHOD_BODY:
+ method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(ConnectionException& e){
+ client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ context->close();
+ }catch(std::exception& e){
+ string error(e.what());
+ client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
+ context->close();
+ }
+ break;
+
+ case HEADER_BODY:
+ this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ break;
+
+ case CONTENT_BODY:
+ this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ break;
+
+ case HEARTBEAT_BODY:
+ //channel must be 0
+ this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ break;
+ }
+}
+
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
+
+ if (client == NULL)
+ {
+ client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
+
+
+ //send connection start
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US"); // channel, majour, minor
+ client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales);
+ }
+}
+
+
+void SessionHandlerImpl::idleOut(){
+
+}
+
+void SessionHandlerImpl::idleIn(){
+
+}
+
+void SessionHandlerImpl::closed(){
+ try {
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+ } catch(std::exception& e) {
+ std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
+ }
+}
+
+void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+ getChannel(channel)->handleHeader(body);
+}
+
+void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+ getChannel(channel)->handleContent(body);
+}
+
+void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+ std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
+ u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const string& /*response*/, const string& /*locale*/){
+ parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+ parent->framemax = framemax;
+ parent->heartbeat = heartbeat;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+ string knownhosts;
+ parent->client->getConnection().openOk(0, knownhosts);
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::close(
+ u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
+ parent->client->getConnection().closeOk(0);
+ parent->context->close();
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+ parent->context->close();
+}
+
+
+
+void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
+
+
+ if (parent->channels[channel] == 0) {
+ parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax,
+ parent->queues->getStore(), parent->settings.stagingThreshold);
+ parent->client->getChannel().openOk(channel);
+ } else {
+ std::stringstream out;
+ out << "Channel already open: " << channel;
+ throw ConnectionException(504, out.str());
+ }
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){
+ parent->getChannel(channel)->flow(active);
+ parent->client->getChannel().flowOk(channel, active);
+}
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
+
+void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/){
+ Channel* c = parent->getChannel(channel);
+ if(c){
+ parent->channels.erase(channel);
+ c->close();
+ delete c;
+ parent->client->getChannel().closeOk(channel);
+ }
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
+
+
+
+void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
+
+ if(passive){
+ if(!parent->exchanges->get(exchange)){
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(530, "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(503, "Exchange type not implemented: " + type);
+ }
+ }
+ if(!nowait){
+ parent->client->getExchange().declareOk(channel);
+ }
+}
+
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
+
+ //TODO: implement unused
+ parent->exchanges->destroy(exchange);
+ if(!nowait) parent->client->getExchange().deleteOk(channel);
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = parent->getQueue(name, channel);
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
+
+ //add default binding:
+ parent->exchanges->getDefault()->bind(queue, name, 0);
+ if (exclusive) {
+ parent->exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ parent->cleaner->add(queue);
+ }
+ }
+ }
+ if (exclusive && !queue->isExclusiveOwner(parent)) {
+ throw ChannelException(405, "Cannot grant exclusive access to queue");
+ }
+ parent->getChannel(channel)->setDefaultQueue(queue);
+ if (!nowait) {
+ string queueName = queue->getName();
+ parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
+ if(exchange){
+// kpvdr - cannot use this any longer as routingKey is now const
+// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+// exchange->bind(queue, routingKey, &arguments);
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if(!nowait) parent->client->getQueue().bindOk(channel);
+ }else{
+ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ int count = queue->purge();
+ if(!nowait) parent->client->getQueue().purgeOk(channel, count);
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = parent->getQueue(queue, channel);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(parent)){
+ queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
+ if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ q->destroy();
+ parent->queues->destroy(queue);
+ }
+
+ if(!nowait) parent->client->getQueue().deleteOk(channel, count);
+}
+
+
+
+
+void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+ //TODO: handle global
+ parent->getChannel(channel)->setPrefetchSize(prefetchSize);
+ parent->getChannel(channel)->setPrefetchCount(prefetchCount);
+ parent->client->getBasic().qosOk(channel);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::consume(
+ u_int16_t channelId, u_int16_t /*ticket*/,
+ const string& queueName, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait, const FieldTable& fields)
+{
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ Channel* channel = parent->channels[channelId];
+ if(!consumerTag.empty() && channel->exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = consumerTag;
+ channel->consume(
+ newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields);
+
+ if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
+
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+ parent->getChannel(channel)->cancel(consumerTag);
+
+ if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate){
+
+ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ if(exchange){
+ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
+ parent->getChannel(channel)->handlePublish(msg, exchange);
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ }
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ if(!parent->getChannel(channelId)->get(queue, !noAck)){
+ string clusterId;//not used, part of an imatix hack
+
+ parent->client->getBasic().getEmpty(channelId, clusterId);
+ }
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+ try{
+ parent->getChannel(channel)->ack(deliveryTag, multiple);
+ }catch(InvalidAckException& e){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+ parent->getChannel(channel)->recover(requeue);
+ parent->client->getBasic().recoverOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
+ parent->getChannel(channel)->begin();
+ parent->client->getTx().selectOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
+ parent->getChannel(channel)->commit();
+ parent->client->getTx().commitOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+
+ parent->getChannel(channel)->rollback();
+ parent->client->getTx().rollbackOk(channel);
+ parent->getChannel(channel)->recover(false);
+}
+
diff --git a/Final/cpp/lib/broker/SessionHandlerImpl.h b/Final/cpp/lib/broker/SessionHandlerImpl.h
new file mode 100644
index 0000000000..7e631b4505
--- /dev/null
+++ b/Final/cpp/lib/broker/SessionHandlerImpl.h
@@ -0,0 +1,270 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandlerImpl_
+#define _SessionHandlerImpl_
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include <exception>
+#include <AMQFrame.h>
+#include <AMQP_ClientProxy.h>
+#include <AMQP_ServerOperations.h>
+#include <AutoDelete.h>
+#include <ExchangeRegistry.h>
+#include <BrokerChannel.h>
+#include <ConnectionToken.h>
+#include <DirectExchange.h>
+#include <OutputHandler.h>
+#include <ProtocolInitiation.h>
+#include <QueueRegistry.h>
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/TimeoutHandler.h>
+#include <TopicExchange.h>
+
+namespace qpid {
+namespace broker {
+
+struct ChannelException : public std::exception {
+ u_int16_t code;
+ string text;
+ ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ChannelException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+struct ConnectionException : public std::exception {
+ u_int16_t code;
+ string text;
+ ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ConnectionException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+class Settings {
+ public:
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+ const u_int64_t stagingThreshold;
+
+ Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
+};
+
+class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
+ public virtual qpid::framing::AMQP_ServerOperations,
+ public virtual ConnectionToken
+{
+ typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+ qpid::sys::SessionContext* context;
+ qpid::framing::AMQP_ClientProxy* client;
+ QueueRegistry* queues;
+ ExchangeRegistry* const exchanges;
+ AutoDelete* const cleaner;
+ const Settings settings;
+
+ std::auto_ptr<BasicHandler> basicHandler;
+ std::auto_ptr<ChannelHandler> channelHandler;
+ std::auto_ptr<ConnectionHandler> connectionHandler;
+ std::auto_ptr<ExchangeHandler> exchangeHandler;
+ std::auto_ptr<QueueHandler> queueHandler;
+ std::auto_ptr<TxHandler> txHandler;
+
+ std::map<u_int16_t, Channel*> channels;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ u_int32_t framemax;
+ u_int16_t heartbeat;
+
+ void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ Channel* getChannel(u_int16_t channel);
+ /**
+ * Get named queue, never returns 0.
+ * @return: named queue or default queue for channel if name=""
+ * @exception: ChannelException if no queue of that name is found.
+ * @exception: ConnectionException if no queue specified and channel has not declared one.
+ */
+ Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
+
+ Exchange::shared_ptr findExchange(const string& name);
+
+ public:
+ SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues,
+ ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings);
+ virtual void received(qpid::framing::AMQFrame* frame);
+ virtual void initiated(qpid::framing::ProtocolInitiation* header);
+ virtual void idleOut();
+ virtual void idleIn();
+ virtual void closed();
+ virtual ~SessionHandlerImpl();
+
+ class ConnectionHandlerImpl : public virtual ConnectionHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism,
+ const string& response, const string& locale);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void secureOk(u_int16_t channel, const string& response);
+
+ virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId,
+ u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ConnectionHandlerImpl(){}
+ };
+
+ class ChannelHandlerImpl : public virtual ChannelHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void open(u_int16_t channel, const string& outOfBand);
+
+ virtual void flow(u_int16_t channel, bool active);
+
+ virtual void flowOk(u_int16_t channel, bool active);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ChannelHandlerImpl(){}
+ };
+
+ class ExchangeHandlerImpl : public virtual ExchangeHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type,
+ bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait);
+
+ virtual ~ExchangeHandlerImpl(){}
+ };
+
+
+ class QueueHandlerImpl : public virtual QueueHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments);
+
+ virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue,
+ const string& exchange, const string& routingKey, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+
+ virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue,
+ bool nowait);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty,
+ bool nowait);
+
+ virtual ~QueueHandlerImpl(){}
+ };
+
+ class BasicHandlerImpl : public virtual BasicHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
+
+ virtual void consume(
+ u_int16_t channel, u_int16_t ticket, const string& queue,
+ const string& consumerTag, bool noLocal, bool noAck,
+ bool exclusive, bool nowait,
+ const qpid::framing::FieldTable& fields);
+
+ virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait);
+
+ virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate);
+
+ virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck);
+
+ virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
+
+ virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
+
+ virtual void recover(u_int16_t channel, bool requeue);
+
+ virtual ~BasicHandlerImpl(){}
+ };
+
+ class TxHandlerImpl : public virtual TxHandler{
+ SessionHandlerImpl* parent;
+ public:
+ TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ virtual ~TxHandlerImpl() {}
+ virtual void select(u_int16_t channel);
+ virtual void commit(u_int16_t channel);
+ virtual void rollback(u_int16_t channel);
+ };
+
+
+ inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
+ inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
+ inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
+ inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
+ inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
+ inline virtual TxHandler* getTxHandler(){ return txHandler.get(); }
+
+ inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }
+ inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }
+ inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }
+ inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }
+ inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
+
+ // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test;
+ // however v0.9 will not - kpvdr 2006-11-17
+ inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); }
+};
+
+}
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/TopicExchange.cpp b/Final/cpp/lib/broker/TopicExchange.cpp
new file mode 100644
index 0000000000..3ebb3c8c56
--- /dev/null
+++ b/Final/cpp/lib/broker/TopicExchange.cpp
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TopicExchange.h>
+#include <ExchangeBinding.h>
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// Areas for improvement:
+// - excessive string copying: should be 0 copy, match from original buffer.
+// - match/lookup: use descision tree or other more efficient structure.
+
+Tokens& Tokens::operator=(const std::string& s) {
+ clear();
+ if (s.empty()) return *this;
+ std::string::const_iterator i = s.begin();
+ while (true) {
+ // Invariant: i is at the beginning of the next untokenized word.
+ std::string::const_iterator j = find(i, s.end(), '.');
+ push_back(std::string(i, j));
+ if (j == s.end()) return *this;
+ i = j + 1;
+ }
+ return *this;
+}
+
+TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
+ Tokens::operator=(tokens);
+ normalize();
+ return *this;
+}
+
+namespace {
+const std::string hashmark("#");
+const std::string star("*");
+}
+
+void TopicPattern::normalize() {
+ std::string word;
+ Tokens::iterator i = begin();
+ while (i != end()) {
+ if (*i == hashmark) {
+ ++i;
+ while (i != end()) {
+ // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
+ if (*i == star) { // Move * before #.
+ std::swap(*i, *(i-1));
+ ++i;
+ } else if (*i == hashmark) {
+ erase(i); // Remove extra #
+ } else {
+ break;
+ }
+ }
+ } else {
+ i ++;
+ }
+ }
+}
+
+
+namespace {
+// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// Need StringRef class that operates on a string in place witout copy.
+// Should be applied everywhere strings are extracted from frames.
+//
+bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
+{
+ // Invariant: [pattern_begin..p) matches [target_begin..t)
+ Tokens::const_iterator p = pattern_begin;
+ Tokens::const_iterator t = target_begin;
+ while (p != pattern_end && t != target_end)
+ {
+ if (*p == star || *p == *t) {
+ ++p, ++t;
+ } else if (*p == hashmark) {
+ ++p;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ while (t != target_end) {
+ ++t;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ }
+ return false;
+ } else {
+ return false;
+ }
+ }
+ while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
+ return t == target_end && p == pattern_end;
+}
+}
+
+bool TopicPattern::match(const Tokens& target) const
+{
+ return do_match(begin(), end(), target.begin(), target.end());
+}
+
+TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
+
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Monitor::ScopedLock l(lock);
+ TopicPattern routingPattern(routingKey);
+ bindings[routingPattern].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+ Monitor::ScopedLock l(lock);
+ BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
+ Queue::vector& qv(bi->second);
+ if (bi == bindings.end()) return;
+ Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+ if(q == qv.end()) return;
+ qv.erase(q);
+ if(qv.empty()) bindings.erase(bi);
+}
+
+
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+ Monitor::ScopedLock l(lock);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first.match(routingKey)) {
+ Queue::vector& qv(i->second);
+ for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ msg.deliverTo(*j);
+ }
+ }
+ }
+}
+
+TopicExchange::~TopicExchange() {}
+
+const std::string TopicExchange::typeName("topic");
+
+
diff --git a/Final/cpp/lib/broker/TopicExchange.h b/Final/cpp/lib/broker/TopicExchange.h
new file mode 100644
index 0000000000..fa0c86863a
--- /dev/null
+++ b/Final/cpp/lib/broker/TopicExchange.h
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TopicExchange_
+#define _TopicExchange_
+
+#include <map>
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+/** A vector of string tokens */
+class Tokens : public std::vector<std::string> {
+ public:
+ Tokens() {};
+ // Default copy, assign, dtor are sufficient.
+
+ /** Tokenize s, provides automatic conversion of string to Tokens */
+ Tokens(const std::string& s) { operator=(s); }
+ /** Tokenizing assignment operator s */
+ Tokens & operator=(const std::string& s);
+
+ private:
+ size_t hash;
+};
+
+
+/**
+ * Tokens that have been normalized as a pattern and can be matched
+ * with topic Tokens. Normalized meands all sequences of mixed * and
+ * # are reduced to a series of * followed by at most one #.
+ */
+class TopicPattern : public Tokens
+{
+ public:
+ TopicPattern() {}
+ // Default copy, assign, dtor are sufficient.
+ TopicPattern(const Tokens& tokens) { operator=(tokens); }
+ TopicPattern(const std::string& str) { operator=(str); }
+ TopicPattern& operator=(const Tokens&);
+ TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); }
+
+ /** Match a topic */
+ bool match(const std::string& topic) { return match(Tokens(topic)); }
+ bool match(const Tokens& topic) const;
+
+ private:
+ void normalize();
+};
+
+class TopicExchange : public virtual Exchange{
+ typedef std::map<TopicPattern, Queue::vector> BindingMap;
+ BindingMap bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ TopicExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~TopicExchange();
+};
+
+
+
+}
+}
+
+#endif
diff --git a/Final/cpp/lib/broker/TransactionalStore.h b/Final/cpp/lib/broker/TransactionalStore.h
new file mode 100644
index 0000000000..17bca3878a
--- /dev/null
+++ b/Final/cpp/lib/broker/TransactionalStore.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TransactionalStore_
+#define _TransactionalStore_
+
+#include <memory>
+
+namespace qpid {
+ namespace broker {
+ struct InvalidTransactionContextException : public std::exception {};
+
+ class TransactionContext{
+ public:
+ virtual ~TransactionContext(){}
+ };
+
+ class TransactionalStore{
+ public:
+ virtual std::auto_ptr<TransactionContext> begin() = 0;
+ virtual void commit(TransactionContext*) = 0;
+ virtual void abort(TransactionContext*) = 0;
+
+ virtual ~TransactionalStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/TxAck.cpp b/Final/cpp/lib/broker/TxAck.cpp
new file mode 100644
index 0000000000..b5211158f3
--- /dev/null
+++ b/Final/cpp/lib/broker/TxAck.cpp
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxAck.h>
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+
+TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked, const std::string* const _xid) :
+ acked(_acked), unacked(_unacked), xid(_xid){
+
+}
+
+bool TxAck::prepare(TransactionContext* ctxt) throw(){
+ try{
+ //dequeue all acked messages from their queues
+ for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+ if (i->coveredBy(&acked)) {
+ i->discard(ctxt, xid);
+ }
+ }
+ return true;
+ }catch(...){
+ std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
+ return false;
+ }
+}
+
+void TxAck::commit() throw(){
+ //remove all acked records from the list
+ unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
+}
+
+void TxAck::rollback() throw(){
+}
diff --git a/Final/cpp/lib/broker/TxAck.h b/Final/cpp/lib/broker/TxAck.h
new file mode 100644
index 0000000000..88c321c445
--- /dev/null
+++ b/Final/cpp/lib/broker/TxAck.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxAck_
+#define _TxAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <AccumulatedAck.h>
+#include <DeliveryRecord.h>
+#include <TxOp.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Defines the transactional behaviour for acks received by a
+ * transactional channel.
+ */
+ class TxAck : public TxOp{
+ AccumulatedAck& acked;
+ std::list<DeliveryRecord>& unacked;
+ const std::string* const xid;
+
+ public:
+ /**
+ * @param acked a representation of the accumulation of
+ * acks received
+ * @param unacked the record of delivered messages
+ */
+ TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked, const std::string* const xid = 0);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~TxAck(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/TxBuffer.cpp b/Final/cpp/lib/broker/TxBuffer.cpp
new file mode 100644
index 0000000000..acd3283bb7
--- /dev/null
+++ b/Final/cpp/lib/broker/TxBuffer.cpp
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxBuffer.h>
+
+using std::mem_fun;
+using namespace qpid::broker;
+
+bool TxBuffer::prepare(TransactionalStore* const store)
+{
+ std::auto_ptr<TransactionContext> ctxt;
+ if(store) ctxt = store->begin();
+ for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ if(!(*i)->prepare(ctxt.get())){
+ if(store) store->abort(ctxt.get());
+ return false;
+ }
+ }
+ if(store) store->commit(ctxt.get());
+ return true;
+}
+
+void TxBuffer::commit()
+{
+ for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+ ops.clear();
+}
+
+void TxBuffer::rollback()
+{
+ for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+ ops.clear();
+}
+
+void TxBuffer::enlist(TxOp* const op)
+{
+ ops.push_back(op);
+}
diff --git a/Final/cpp/lib/broker/TxBuffer.h b/Final/cpp/lib/broker/TxBuffer.h
new file mode 100644
index 0000000000..2d9a2a3679
--- /dev/null
+++ b/Final/cpp/lib/broker/TxBuffer.h
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxBuffer_
+#define _TxBuffer_
+
+#include <algorithm>
+#include <functional>
+#include <vector>
+#include <TransactionalStore.h>
+#include <TxOp.h>
+
+/**
+ * Represents a single transaction. As such, an instance of this class
+ * will hold a list of operations representing the workload of the
+ * transaction. This work can be committed or rolled back. Committing
+ * is a two-stage process: first all the operations should be
+ * prepared, then if that succeeds they can be committed.
+ *
+ * In the 2pc case, a successful prepare may be followed by either a
+ * commit or a rollback.
+ *
+ * Atomicity of prepare is ensured by using a lower level
+ * transactional facility. This saves explicitly rolling back all the
+ * successfully prepared ops when one of them fails. i.e. we do not
+ * use 2pc internally, we instead ensure that prepare is atomic at a
+ * lower level. This makes individual prepare operations easier to
+ * code.
+ *
+ * Transactions on a messaging broker effect three types of 'action':
+ * (1) updates to persistent storage (2) updates to transient storage
+ * or cached data (3) network writes.
+ *
+ * Of these, (1) should always occur atomically during prepare to
+ * ensure that if the broker crashes while a transaction is being
+ * completed the persistent state (which is all that then remains) is
+ * consistent. (3) can only be done on commit, after a successful
+ * prepare. There is a little more flexibility with (2) but any
+ * changes made during prepare should be subject to the control of the
+ * TransactionalStore in use.
+ */
+namespace qpid {
+ namespace broker {
+ class TxBuffer{
+ typedef std::vector<TxOp*>::iterator op_iterator;
+ std::vector<TxOp*> ops;
+ public:
+ /**
+ * Requests that all ops are prepared. This should
+ * primarily involve making sure that a persistent record
+ * of the operations is stored where necessary.
+ *
+ * All ops will be prepared under a transaction on the
+ * specified store. If any operation fails on prepare,
+ * this transaction will be rolled back.
+ *
+ * Once prepared, a transaction can be committed (or in
+ * the 2pc case, rolled back).
+ *
+ * @returns true if all the operations prepared
+ * successfully, false if not.
+ */
+ bool prepare(TransactionalStore* const store);
+ /**
+ * Signals that the ops all prepared all completed
+ * successfully and can now commit, i.e. the operation can
+ * now be fully carried out.
+ *
+ * Should only be called after a call to prepare() returns
+ * true.
+ */
+ void commit();
+ /**
+ * Rolls back all the operations.
+ *
+ * Should only be called either after a call to prepare()
+ * returns true (2pc) or instead of a prepare call
+ * ('server-local')
+ */
+ void rollback();
+ /**
+ * Adds an operation to the transaction.
+ */
+ void enlist(TxOp* const op);
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/TxOp.h b/Final/cpp/lib/broker/TxOp.h
new file mode 100644
index 0000000000..abba84a8e8
--- /dev/null
+++ b/Final/cpp/lib/broker/TxOp.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxOp_
+#define _TxOp_
+
+#include <TransactionalStore.h>
+
+namespace qpid {
+ namespace broker {
+ class TxOp{
+ public:
+ virtual bool prepare(TransactionContext*) throw() = 0;
+ virtual void commit() throw() = 0;
+ virtual void rollback() throw() = 0;
+ virtual ~TxOp(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/Final/cpp/lib/broker/TxPublish.cpp b/Final/cpp/lib/broker/TxPublish.cpp
new file mode 100644
index 0000000000..49dd8abd89
--- /dev/null
+++ b/Final/cpp/lib/broker/TxPublish.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxPublish.h>
+
+using namespace qpid::broker;
+
+TxPublish::TxPublish(Message::shared_ptr _msg, const std::string* const _xid) : msg(_msg), xid(_xid) {}
+
+bool TxPublish::prepare(TransactionContext* ctxt) throw(){
+ try{
+ for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, xid));
+ return true;
+ }catch(...){
+ std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
+ return false;
+ }
+}
+
+void TxPublish::commit() throw(){
+ for_each(queues.begin(), queues.end(), Commit(msg));
+}
+
+void TxPublish::rollback() throw(){
+}
+
+void TxPublish::deliverTo(Queue::shared_ptr& queue){
+ queues.push_back(queue);
+}
+
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid)
+ : ctxt(_ctxt), msg(_msg), xid(_xid){}
+
+void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
+ queue->enqueue(ctxt, msg, xid);
+}
+
+TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
+
+void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
+ queue->process(msg);
+}
+
diff --git a/Final/cpp/lib/broker/TxPublish.h b/Final/cpp/lib/broker/TxPublish.h
new file mode 100644
index 0000000000..75f201257e
--- /dev/null
+++ b/Final/cpp/lib/broker/TxPublish.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxPublish_
+#define _TxPublish_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <Deliverable.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+#include <TxOp.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Defines the behaviour for publish operations on a
+ * transactional channel. Messages are routed through
+ * exchanges when received but are not at that stage delivered
+ * to the matching queues, rather the queues are held in an
+ * instance of this class. On prepare() the message is marked
+ * enqueued to the relevant queues in the MessagesStore. On
+ * commit() the messages will be passed to the queue for
+ * dispatch or to be added to the in-memory queue.
+ */
+ class TxPublish : public TxOp, public Deliverable{
+ class Prepare{
+ TransactionContext* ctxt;
+ Message::shared_ptr& msg;
+ const std::string* const xid;
+ public:
+ Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid);
+ void operator()(Queue::shared_ptr& queue);
+ };
+
+ class Commit{
+ Message::shared_ptr& msg;
+ public:
+ Commit(Message::shared_ptr& msg);
+ void operator()(Queue::shared_ptr& queue);
+ };
+
+ Message::shared_ptr msg;
+ const std::string* const xid;
+ std::list<Queue::shared_ptr> queues;
+
+ public:
+ TxPublish(Message::shared_ptr msg, const std::string* const xid = 0);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+
+ virtual void deliverTo(Queue::shared_ptr& queue);
+
+ virtual ~TxPublish(){}
+ };
+ }
+}
+
+
+#endif