diff options
Diffstat (limited to 'Final/cpp/lib/broker')
71 files changed, 6103 insertions, 0 deletions
diff --git a/Final/cpp/lib/broker/AccumulatedAck.cpp b/Final/cpp/lib/broker/AccumulatedAck.cpp new file mode 100644 index 0000000000..a9826ba5ea --- /dev/null +++ b/Final/cpp/lib/broker/AccumulatedAck.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <AccumulatedAck.h> + +using std::less_equal; +using std::bind2nd; +using namespace qpid::broker; + +void AccumulatedAck::update(u_int64_t tag, bool multiple){ + if(multiple){ + if(tag > range) range = tag; + //else don't care, it is already counted + }else if(tag > range){ + individual.push_back(tag); + } +} + +void AccumulatedAck::consolidate(){ + individual.sort(); + //remove any individual tags that are covered by range + individual.remove_if(bind2nd(less_equal<u_int64_t>(), range)); +} + +void AccumulatedAck::clear(){ + range = 0; + individual.clear(); +} + +bool AccumulatedAck::covers(u_int64_t tag) const{ + return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end(); +} diff --git a/Final/cpp/lib/broker/AccumulatedAck.h b/Final/cpp/lib/broker/AccumulatedAck.h new file mode 100644 index 0000000000..c472f7f3ea --- /dev/null +++ b/Final/cpp/lib/broker/AccumulatedAck.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _AccumulatedAck_ +#define _AccumulatedAck_ + +#include <algorithm> +#include <functional> +#include <list> + +namespace qpid { + namespace broker { + /** + * Keeps an accumulated record of acked messages (by delivery + * tag). + */ + class AccumulatedAck { + public: + /** + * If not zero, then everything up to this value has been + * acked. + */ + u_int64_t range; + /** + * List of individually acked messages that are not + * included in the range marked by 'range'. + */ + std::list<u_int64_t> individual; + + AccumulatedAck(u_int64_t r) : range(r) {} + void update(u_int64_t tag, bool multiple); + void consolidate(); + void clear(); + bool covers(u_int64_t tag) const; + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/AutoDelete.cpp b/Final/cpp/lib/broker/AutoDelete.cpp new file mode 100644 index 0000000000..ae48d10505 --- /dev/null +++ b/Final/cpp/lib/broker/AutoDelete.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <AutoDelete.h> +#include <sys/Time.h> + +using namespace qpid::broker; +using namespace qpid::sys; + +AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) + : registry(_registry), period(_period), stopped(true) { } + +void AutoDelete::add(Queue::shared_ptr const queue){ + Mutex::ScopedLock l(lock); + queues.push(queue); +} + +Queue::shared_ptr const AutoDelete::pop(){ + Queue::shared_ptr next; + Mutex::ScopedLock l(lock); + if(!queues.empty()){ + next = queues.front(); + queues.pop(); + } + return next; +} + +void AutoDelete::process(){ + Queue::shared_ptr seen; + for(Queue::shared_ptr q = pop(); q; q = pop()){ + if(seen == q){ + add(q); + break; + }else if(q->canAutoDelete()){ + std::string name(q->getName()); + registry->destroy(name); + std::cout << "INFO: Auto-deleted queue named " << name << std::endl; + }else{ + add(q); + if(!seen) seen = q; + } + } +} + +void AutoDelete::run(){ + Monitor::ScopedLock l(monitor); + while(!stopped){ + process(); + monitor.wait(period*TIME_MSEC); + } +} + +void AutoDelete::start(){ + Monitor::ScopedLock l(monitor); + if(stopped){ + stopped = false; + runner = Thread(this); + } +} + +void AutoDelete::stop(){ + { + Monitor::ScopedLock l(monitor); + if(stopped) return; + stopped = true; + } + monitor.notify(); + runner.join(); +} diff --git a/Final/cpp/lib/broker/AutoDelete.h b/Final/cpp/lib/broker/AutoDelete.h new file mode 100644 index 0000000000..19a5938df1 --- /dev/null +++ b/Final/cpp/lib/broker/AutoDelete.h @@ -0,0 +1,55 @@ +#ifndef _AutoDelete_ +#define _AutoDelete_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <queue> +#include <sys/Monitor.h> +#include <BrokerQueue.h> +#include <QueueRegistry.h> +#include <sys/Thread.h> + +namespace qpid { + namespace broker{ + class AutoDelete : private qpid::sys::Runnable { + qpid::sys::Mutex lock; + qpid::sys::Monitor monitor; + std::queue<Queue::shared_ptr> queues; + QueueRegistry* const registry; + u_int32_t period; + volatile bool stopped; + qpid::sys::Thread runner; + + Queue::shared_ptr const pop(); + void process(); + virtual void run(); + + public: + AutoDelete(QueueRegistry* const registry, u_int32_t period); + void add(Queue::shared_ptr const); + void start(); + void stop(); + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/Binding.h b/Final/cpp/lib/broker/Binding.h new file mode 100644 index 0000000000..16ca223208 --- /dev/null +++ b/Final/cpp/lib/broker/Binding.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Binding_ +#define _Binding_ + +#include <FieldTable.h> + +namespace qpid { + namespace broker { + class Binding{ + public: + virtual void cancel() = 0; + virtual ~Binding(){} + }; + } +} + + +#endif + diff --git a/Final/cpp/lib/broker/Broker.cpp b/Final/cpp/lib/broker/Broker.cpp new file mode 100644 index 0000000000..806127bf43 --- /dev/null +++ b/Final/cpp/lib/broker/Broker.cpp @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <memory> +#include <Broker.h> + +using namespace qpid::broker; +using namespace qpid::sys; + +Broker::Options::Options() : + workerThreads(5), + maxConnections(500), + connectionBacklog(10), + store(), + stagingThreshold(5000000) +{} + +void Broker::Options::addTo(po::options_description& desc) +{ + using namespace po; + CommonOptions::addTo(desc); + desc.add_options() + ("worker-threads", optValue(workerThreads, "N"), + "Broker thread pool size") + ("max-connections", optValue(maxConnections, "N"), + "Maximum allowed connections") + ("connection-backlog", optValue(connectionBacklog, "N"), + "Connection backlog limit for server socket.") + ("staging-threshold", optValue(stagingThreshold, "N"), + "Messages over N bytes are staged to disk.") + ("store", optValue(store,"LIBNAME"), + "Name of message store shared library."); +} + + +Broker::Broker(const Options& config) : + acceptor(Acceptor::create(config.port, + config.connectionBacklog, + config.workerThreads, + config.trace)), + factory(config.store) +{ } + + +Broker::shared_ptr Broker::create(int16_t port) +{ + Options config; + config.port=port; + return create(config); +} + +Broker::shared_ptr Broker::create(const Options& config) { + return Broker::shared_ptr(new Broker(config)); +} + +void Broker::run() { + acceptor->run(&factory); +} + +void Broker::shutdown() { + if (acceptor) + acceptor->shutdown(); +} + +Broker::~Broker() { } + diff --git a/Final/cpp/lib/broker/Broker.h b/Final/cpp/lib/broker/Broker.h new file mode 100644 index 0000000000..8b54bd592b --- /dev/null +++ b/Final/cpp/lib/broker/Broker.h @@ -0,0 +1,91 @@ +#ifndef _Broker_ +#define _Broker_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <CommonOptions.h> +#include <SessionHandlerFactoryImpl.h> +#include <sys/Runnable.h> +#include <sys/Acceptor.h> +#include <SharedObject.h> + +namespace qpid { +namespace broker { +/** + * A broker instance. + */ +class Broker : public qpid::sys::Runnable, + public qpid::SharedObject<Broker> +{ + public: + struct Options : public CommonOptions { + Options(); + void addTo(po::options_description&); + int workerThreads; + int maxConnections; + int connectionBacklog; + std::string store; + long stagingThreshold; + }; + + virtual ~Broker(); + + /** + * Create a broker. + * @param port Port to listen on or 0 to pick a port dynamically. + */ + static shared_ptr create(int16_t port = Options::DEFAULT_PORT); + + /** + * Create a broker using a Configuration. + */ + static shared_ptr create(const Options& config); + + /** + * Return listening port. If called before bind this is + * the configured port. If called after it is the actual + * port, which will be different if the configured port is + * 0. + */ + virtual int16_t getPort() const { return acceptor->getPort(); } + + /** + * Run the broker. Implements Runnable::run() so the broker + * can be run in a separate thread. + */ + virtual void run(); + + /** Shut down the broker */ + virtual void shutdown(); + + private: + Broker(const Options& config); + Options config; + qpid::sys::Acceptor::shared_ptr acceptor; + SessionHandlerFactoryImpl factory; +}; +} +} + + + +#endif /*!_Broker_*/ diff --git a/Final/cpp/lib/broker/BrokerChannel.cpp b/Final/cpp/lib/broker/BrokerChannel.cpp new file mode 100644 index 0000000000..d8fbdc467c --- /dev/null +++ b/Final/cpp/lib/broker/BrokerChannel.cpp @@ -0,0 +1,272 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerChannel.h> +#include <QpidError.h> +#include <iostream> +#include <sstream> +#include <assert.h> + +using std::mem_fun_ref; +using std::bind2nd; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + + +Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : + id(_id), + out(_out), + currentDeliveryTag(1), + transactional(false), + prefetchSize(0), + prefetchCount(0), + framesize(_framesize), + tagGenerator("sgen"), + accumulatedAck(0), + store(_store), + messageBuilder(this, _store, _stagingThreshold), + version(_version), + flowActive(true){ + + outstanding.reset(); +} + +Channel::~Channel(){ +} + +bool Channel::exists(const string& consumerTag){ + return consumers.find(consumerTag) != consumers.end(); +} + +void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){ + if(tag.empty()) tag = tagGenerator.generate(); + ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); + try{ + queue->consume(c, exclusive);//may throw exception + consumers[tag] = c; + }catch(ExclusiveAccessException& e){ + delete c; + throw e; + } +} + +void Channel::cancel(consumer_iterator i){ + ConsumerImpl* c = i->second; + consumers.erase(i); + if(c){ + c->cancel(); + delete c; + } +} + +void Channel::cancel(const string& tag){ + consumer_iterator i = consumers.find(tag); + if(i != consumers.end()){ + cancel(i); + } +} + +void Channel::close(){ + //cancel all consumers + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ + cancel(i); + } + //requeue: + recover(true); +} + +void Channel::begin(){ + transactional = true; +} + +void Channel::commit(){ + TxAck txAck(accumulatedAck, unacked); + txBuffer.enlist(&txAck); + if(txBuffer.prepare(store)){ + txBuffer.commit(); + } + accumulatedAck.clear(); +} + +void Channel::rollback(){ + txBuffer.rollback(); + accumulatedAck.clear(); +} + +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ + Mutex::ScopedLock locker(deliveryLock); + + u_int64_t deliveryTag = currentDeliveryTag++; + if(ackExpected){ + unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); + outstanding.size += msg->contentSize(); + outstanding.count++; + } + //send deliver method, header and content(s) + msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); +} + +bool Channel::checkPrefetch(Message::shared_ptr& msg){ + Mutex::ScopedLock locker(deliveryLock); + bool countOk = !prefetchCount || prefetchCount > unacked.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); + return countOk && sizeOk; +} + +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, + Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack) : parent(_parent), + tag(_tag), + queue(_queue), + connection(_connection), + ackExpected(ack), + blocked(false){ +} + +bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ + if(!connection || connection != msg->getPublisher()){//check for no_local + if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){ + blocked = true; + }else{ + blocked = false; + parent->deliver(msg, tag, queue, ackExpected); + return true; + } + } + return false; +} + +void Channel::ConsumerImpl::cancel(){ + if(queue) queue->cancel(this); +} + +void Channel::ConsumerImpl::requestDispatch(){ + if(blocked) queue->dispatch(); +} + +void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){ + Message::shared_ptr message(_message); + exchange = _exchange; + messageBuilder.initialise(message); +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ + messageBuilder.setHeader(header); + //at this point, decide based on the size of the message whether we want + //to stage it by saving content directly to disk as it arrives +} + +void Channel::handleContent(AMQContentBody::shared_ptr content){ + messageBuilder.addContent(content); +} + +void Channel::complete(Message::shared_ptr& msg){ + if(exchange){ + if(transactional){ + TxPublish* deliverable = new TxPublish(msg); + exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable)); + }else{ + DeliverableMessage deliverable(msg); + exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + } + exchange.reset(); + }else{ + std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; + } +} + +void Channel::ack(u_int64_t deliveryTag, bool multiple){ + if(transactional){ + accumulatedAck.update(deliveryTag, multiple); + //TODO: I think the outstanding prefetch size & count should be updated at this point... + //TODO: ...this may then necessitate dispatching to consumers + }else{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); + if(i == unacked.end()){ + throw InvalidAckException(); + }else if(multiple){ + ack_iterator end = ++i; + for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); + unacked.erase(unacked.begin(), end); + + //recalculate the prefetch: + outstanding.reset(); + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); + }else{ + i->discard(); + i->subtractFrom(&outstanding); + unacked.erase(i); + } + + //if the prefetch limit had previously been reached, there may + //be messages that can be now be delivered + for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ + j->second->requestDispatch(); + } + } +} + +void Channel::recover(bool requeue){ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + if(requeue){ + outstanding.reset(); + std::list<DeliveryRecord> copy = unacked; + unacked.clear(); + for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); + }else{ + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); + } +} + +bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ + Message::shared_ptr msg = queue->dequeue(); + if(msg){ + Mutex::ScopedLock locker(deliveryLock); + u_int64_t myDeliveryTag = currentDeliveryTag++; + msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version); + if(ackExpected){ + unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ + msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); +} + +void Channel::flow(bool active){ + Mutex::ScopedLock locker(deliveryLock); + bool requestDelivery(!flowActive && active); + flowActive = active; + if (requestDelivery) { + //there may be messages that can be now be delivered + for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ + j->second->requestDispatch(); + } + } +} diff --git a/Final/cpp/lib/broker/BrokerChannel.h b/Final/cpp/lib/broker/BrokerChannel.h new file mode 100644 index 0000000000..be40a25013 --- /dev/null +++ b/Final/cpp/lib/broker/BrokerChannel.h @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Channel_ +#define _Channel_ + +#include <algorithm> +#include <functional> +#include <list> +#include <map> +#include <AccumulatedAck.h> +#include <Binding.h> +#include <Consumer.h> +#include <DeletingTxOp.h> +#include <DeliverableMessage.h> +#include <DeliveryRecord.h> +#include <BrokerMessage.h> +#include <MessageBuilder.h> +#include <NameGenerator.h> +#include <Prefetch.h> +#include <BrokerQueue.h> +#include <MessageStore.h> +#include <TxAck.h> +#include <TxBuffer.h> +#include <TxPublish.h> +#include <sys/Monitor.h> +#include <OutputHandler.h> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include <BasicPublishBody.h> + +namespace qpid { + namespace broker { + using qpid::framing::string; + + /** + * Maintains state for an AMQP channel. Handles incoming and + * outgoing messages for that channel. + */ + class Channel : private MessageBuilder::CompletionHandler{ + class ConsumerImpl : public virtual Consumer{ + Channel* parent; + const string tag; + Queue::shared_ptr queue; + ConnectionToken* const connection; + const bool ackExpected; + bool blocked; + public: + ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); + virtual bool deliver(Message::shared_ptr& msg); + void cancel(); + void requestDispatch(); + }; + + typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + const int id; + qpid::framing::OutputHandler* out; + u_int64_t currentDeliveryTag; + Queue::shared_ptr defaultQueue; + bool transactional; + std::map<string, ConsumerImpl*> consumers; + u_int32_t prefetchSize; + u_int16_t prefetchCount; + Prefetch outstanding; + u_int32_t framesize; + NameGenerator tagGenerator; + std::list<DeliveryRecord> unacked; + qpid::sys::Mutex deliveryLock; + TxBuffer txBuffer; + AccumulatedAck accumulatedAck; + MessageStore* const store; + MessageBuilder messageBuilder;//builder for in-progress message + Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to + qpid::framing::ProtocolVersion version; // version used for this channel + bool flowActive; + + virtual void complete(Message::shared_ptr& msg); + void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); + void cancel(consumer_iterator consumer); + bool checkPrefetch(Message::shared_ptr& msg); + + public: + Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize, + MessageStore* const _store = 0, u_int64_t stagingThreshold = 0); + ~Channel(); + inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } + inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } + inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; } + inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; } + bool exists(const string& consumerTag); + void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, + ConnectionToken* const connection = 0, const qpid::framing::FieldTable* = 0); + void cancel(const string& tag); + bool get(Queue::shared_ptr queue, bool ackExpected); + void begin(); + void close(); + void commit(); + void rollback(); + void ack(u_int64_t deliveryTag, bool multiple); + void recover(bool requeue); + void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); + void handlePublish(Message* msg, Exchange::shared_ptr exchange); + void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void handleContent(qpid::framing::AMQContentBody::shared_ptr content); + void flow(bool active); + }; + + struct InvalidAckException{}; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/BrokerExchange.h b/Final/cpp/lib/broker/BrokerExchange.h new file mode 100644 index 0000000000..f5e4d9cb28 --- /dev/null +++ b/Final/cpp/lib/broker/BrokerExchange.h @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Exchange_ +#define _Exchange_ + +#include <boost/shared_ptr.hpp> +#include <Deliverable.h> +#include <BrokerQueue.h> +#include <FieldTable.h> + +namespace qpid { + namespace broker { + using std::string; + + class Exchange{ + const string name; + public: + typedef boost::shared_ptr<Exchange> shared_ptr; + + explicit Exchange(const string& _name) : name(_name){} + virtual ~Exchange(){} + string getName() { return name; } + virtual string getType() = 0; + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/BrokerMessage.cpp b/Final/cpp/lib/broker/BrokerMessage.cpp new file mode 100644 index 0000000000..6ba2131a74 --- /dev/null +++ b/Final/cpp/lib/broker/BrokerMessage.cpp @@ -0,0 +1,223 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerMessage.h> +#include <iostream> + +#include <InMemoryContent.h> +#include <LazyLoadedContent.h> +#include <MessageStore.h> +#include <BasicDeliverBody.h> +#include <BasicGetOkBody.h> + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +Message::Message(const ConnectionToken* const _publisher, + const string& _exchange, const string& _routingKey, + bool _mandatory, bool _immediate) : publisher(_publisher), + exchange(_exchange), + routingKey(_routingKey), + mandatory(_mandatory), + immediate(_immediate), + redelivered(false), + size(0), + persistenceId(0) {} + +Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : + publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ + + decode(buffer, headersOnly, contentChunkSize); +} + +Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} + +Message::~Message(){ + if (content.get()) content->destroy(); +} + +void Message::setHeader(AMQHeaderBody::shared_ptr _header){ + this->header = _header; +} + +void Message::addContent(AMQContentBody::shared_ptr data){ + if (!content.get()) { + content = std::auto_ptr<Content>(new InMemoryContent()); + } + content->add(data); + size += data->size(); +} + +bool Message::isComplete(){ + return header.get() && (header->getContentSize() == contentSize()); +} + +void Message::redeliver(){ + redelivered = true; +} + +void Message::deliver(OutputHandler* out, int channel, + const string& consumerTag, u_int64_t deliveryTag, + u_int32_t framesize, + ProtocolVersion* version){ + // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction + out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey))); + sendContent(out, channel, framesize, version); +} + +void Message::sendGetOk(OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + ProtocolVersion* version){ + // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction + out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount))); + sendContent(out, channel, framesize, version); +} + +void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ + AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); + out->send(new AMQFrame(*version, channel, headerBody)); + + Mutex::ScopedLock locker(contentLock); + if (content.get()) content->send(*version, out, channel, framesize); +} + +BasicHeaderProperties* Message::getHeaderProperties(){ + return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); +} + +const ConnectionToken* const Message::getPublisher(){ + return publisher; +} + +bool Message::isPersistent() +{ + if(!header) return false; + BasicHeaderProperties* props = getHeaderProperties(); + return props && props->getDeliveryMode() == PERSISTENT; +} + +void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) +{ + decodeHeader(buffer); + if (!headersOnly) decodeContent(buffer, contentChunkSize); +} + +void Message::decodeHeader(Buffer& buffer) +{ + buffer.getShortString(exchange); + buffer.getShortString(routingKey); + + u_int32_t headerSize = buffer.getLong(); + AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); + headerBody->decode(buffer, headerSize); + setHeader(headerBody); +} + +void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) +{ + u_int64_t expected = expectedContentSize(); + if (expected != buffer.available()) { + std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; + throw Exception("Cannot decode content, buffer not large enough."); + } + + if (!chunkSize || chunkSize > expected) { + chunkSize = expected; + } + + u_int64_t total = 0; + while (total < expectedContentSize()) { + u_int64_t remaining = expected - total; + AMQContentBody::shared_ptr contentBody(new AMQContentBody()); + contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); + addContent(contentBody); + total += chunkSize; + } +} + +void Message::encode(Buffer& buffer) +{ + encodeHeader(buffer); + encodeContent(buffer); +} + +void Message::encodeHeader(Buffer& buffer) +{ + buffer.putShortString(exchange); + buffer.putShortString(routingKey); + buffer.putLong(header->size()); + header->encode(buffer); +} + +void Message::encodeContent(Buffer& buffer) +{ + Mutex::ScopedLock locker(contentLock); + if (content.get()) content->encode(buffer); +} + +u_int32_t Message::encodedSize() +{ + return encodedHeaderSize() + encodedContentSize(); +} + +u_int32_t Message::encodedContentSize() +{ + Mutex::ScopedLock locker(contentLock); + return content.get() ? content->size() : 0; +} + +u_int32_t Message::encodedHeaderSize() +{ + return exchange.size() + 1 + + routingKey.size() + 1 + + header->size() + 4;//4 extra bytes for size +} + +u_int64_t Message::expectedContentSize() +{ + return header.get() ? header->getContentSize() : 0; +} + +void Message::releaseContent(MessageStore* store) +{ + Mutex::ScopedLock locker(contentLock); + if (!isPersistent() && persistenceId == 0) { + store->stage(this); + } + if (!content.get() || content->size() > 0) { + //set content to lazy loading mode (but only if there is stored content): + + //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is + // then set as a member of that message so its lifetime is guaranteed to be no longer than + // that of the message itself + content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize())); + } +} + +void Message::setContent(std::auto_ptr<Content>& _content) +{ + Mutex::ScopedLock locker(contentLock); + content = _content; +} diff --git a/Final/cpp/lib/broker/BrokerMessage.h b/Final/cpp/lib/broker/BrokerMessage.h new file mode 100644 index 0000000000..1f68e1004a --- /dev/null +++ b/Final/cpp/lib/broker/BrokerMessage.h @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Message_ +#define _Message_ + +#include <memory> +#include <boost/shared_ptr.hpp> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include <ProtocolVersion.h> +#include <BasicHeaderProperties.h> +#include <ConnectionToken.h> +#include <Content.h> +#include <OutputHandler.h> +#include <Mutex.h> +#include <TxBuffer.h> + +namespace qpid { + namespace broker { + + class MessageStore; + using qpid::framing::string; + + /** + * Represents an AMQP message, i.e. a header body, a list of + * content bodies and some details about the publication + * request. + */ + class Message{ + const ConnectionToken* const publisher; + string exchange; + string routingKey; + const bool mandatory; + const bool immediate; + bool redelivered; + qpid::framing::AMQHeaderBody::shared_ptr header; + std::auto_ptr<Content> content; + u_int64_t size; + u_int64_t persistenceId; + qpid::sys::Mutex contentLock; + + void sendContent(qpid::framing::OutputHandler* out, + int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version); + + public: + typedef boost::shared_ptr<Message> shared_ptr; + + Message(const ConnectionToken* const publisher, + const string& exchange, const string& routingKey, + bool mandatory, bool immediate); + Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); + Message(); + ~Message(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + const ConnectionToken* const getPublisher(); + + void deliver(qpid::framing::OutputHandler* out, + int channel, + const string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize, + qpid::framing::ProtocolVersion* version); + void redeliver(); + + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const string& getRoutingKey() const { return routingKey; } + const string& getExchange() const { return exchange; } + u_int64_t contentSize() const { return size; } + u_int64_t getPersistenceId() const { return persistenceId; } + void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } + + void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); + void decodeHeader(qpid::framing::Buffer& buffer); + void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0); + + void encode(qpid::framing::Buffer& buffer); + void encodeHeader(qpid::framing::Buffer& buffer); + void encodeContent(qpid::framing::Buffer& buffer); + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + */ + u_int32_t encodedSize(); + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + */ + u_int32_t encodedHeaderSize(); + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + u_int32_t encodedContentSize(); + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + void releaseContent(MessageStore* store); + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + u_int64_t expectedContentSize(); + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + void setContent(std::auto_ptr<Content>& content); + }; + + } +} + + +#endif diff --git a/Final/cpp/lib/broker/BrokerQueue.cpp b/Final/cpp/lib/broker/BrokerQueue.cpp new file mode 100644 index 0000000000..0e48d3b13d --- /dev/null +++ b/Final/cpp/lib/broker/BrokerQueue.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerQueue.h> +#include <MessageStore.h> +#include <sys/Monitor.h> +#include <sys/Time.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +Queue::Queue(const string& _name, u_int32_t _autodelete, + MessageStore* const _store, + const ConnectionToken* const _owner) : + + name(_name), + autodelete(_autodelete), + store(_store), + owner(_owner), + queueing(false), + dispatching(false), + next(0), + lastUsed(0), + exclusive(0), + persistenceId(0) +{ + if(autodelete) lastUsed = now()/TIME_MSEC; +} + +Queue::~Queue(){ + for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){ + b->cancel(); + bindings.pop(); + } +} + +void Queue::bound(Binding* b){ + bindings.push(b); +} + +void Queue::deliver(Message::shared_ptr& msg){ + enqueue(0, msg, 0); + process(msg); +} + +void Queue::recover(Message::shared_ptr& msg){ + push(msg); + if (store && msg->expectedContentSize() != msg->encodedContentSize()) { + //content has not been loaded, need to ensure that lazy loading mode is set: + //TODO: find a nicer way to do this + msg->releaseContent(store); + } +} + +void Queue::process(Message::shared_ptr& msg){ + Mutex::ScopedLock locker(lock); + if(queueing || !dispatch(msg)){ + push(msg); + } +} + +bool Queue::dispatch(Message::shared_ptr& msg){ + if(consumers.empty()){ + return false; + }else if(exclusive){ + if(!exclusive->deliver(msg)){ + std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; + } + return true; + }else{ + //deliver to next consumer + next = next % consumers.size(); + Consumer* c = consumers[next]; + int start = next; + while(c){ + next++; + if(c->deliver(msg)) return true; + + next = next % consumers.size(); + c = next == start ? 0 : consumers[next]; + } + return false; + } +} + +bool Queue::startDispatching(){ + Mutex::ScopedLock locker(lock); + if(queueing && !dispatching){ + dispatching = true; + return true; + }else{ + return false; + } +} + +void Queue::dispatch(){ + bool proceed = startDispatching(); + while(proceed){ + Mutex::ScopedLock locker(lock); + if(!messages.empty() && dispatch(messages.front())){ + pop(); + }else{ + dispatching = false; + proceed = false; + queueing = !messages.empty(); + } + } +} + +void Queue::consume(Consumer* c, bool requestExclusive){ + Mutex::ScopedLock locker(lock); + if(exclusive) throw ExclusiveAccessException(); + if(requestExclusive){ + if(!consumers.empty()) throw ExclusiveAccessException(); + exclusive = c; + } + + if(autodelete && consumers.empty()) lastUsed = 0; + consumers.push_back(c); +} + +void Queue::cancel(Consumer* c){ + Mutex::ScopedLock locker(lock); + consumers.erase(find(consumers.begin(), consumers.end(), c)); + if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC; + if(exclusive == c) exclusive = 0; +} + +Message::shared_ptr Queue::dequeue(){ + Mutex::ScopedLock locker(lock); + Message::shared_ptr msg; + if(!messages.empty()){ + msg = messages.front(); + pop(); + } + return msg; +} + +u_int32_t Queue::purge(){ + Mutex::ScopedLock locker(lock); + int count = messages.size(); + while(!messages.empty()) pop(); + return count; +} + +void Queue::pop(){ + if (policy.get()) policy->dequeued(messages.front()->contentSize()); + messages.pop(); +} + +void Queue::push(Message::shared_ptr& msg){ + queueing = true; + messages.push(msg); + if (policy.get()) { + policy->enqueued(msg->contentSize()); + if (policy->limitExceeded()) { + msg->releaseContent(store); + } + } +} + +u_int32_t Queue::getMessageCount() const{ + Mutex::ScopedLock locker(lock); + return messages.size(); +} + +u_int32_t Queue::getConsumerCount() const{ + Mutex::ScopedLock locker(lock); + return consumers.size(); +} + +bool Queue::canAutoDelete() const{ + Mutex::ScopedLock locker(lock); + return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete); +} + +void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) +{ + if (msg->isPersistent() && store) { + store->enqueue(ctxt, msg.get(), *this, xid); + } +} + +void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) +{ + if (msg->isPersistent() && store) { + store->dequeue(ctxt, msg.get(), *this, xid); + } +} + +namespace +{ + const std::string qpidMaxSize("qpid.max_size"); + const std::string qpidMaxCount("qpid.max_count"); +} + +void Queue::create(const FieldTable& settings) +{ + if (store) { + store->create(*this, settings); + } + configure(settings); +} + +void Queue::configure(const FieldTable& settings) +{ + QueuePolicy* _policy = new QueuePolicy(settings); + if (_policy->getMaxCount() || _policy->getMaxSize()) { + setPolicy(std::auto_ptr<QueuePolicy>(_policy)); + } +} + +void Queue::destroy() +{ + if (store) { + store->destroy(*this); + } +} + +void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) +{ + policy = _policy; +} + +const QueuePolicy* const Queue::getPolicy() +{ + return policy.get(); +} diff --git a/Final/cpp/lib/broker/BrokerQueue.h b/Final/cpp/lib/broker/BrokerQueue.h new file mode 100644 index 0000000000..41611bebe9 --- /dev/null +++ b/Final/cpp/lib/broker/BrokerQueue.h @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Queue_ +#define _Queue_ + +#include <vector> +#include <memory> +#include <queue> +#include <boost/shared_ptr.hpp> +#include <amqp_types.h> +#include <Binding.h> +#include <ConnectionToken.h> +#include <Consumer.h> +#include <BrokerMessage.h> +#include <FieldTable.h> +#include <sys/Monitor.h> +#include <QueuePolicy.h> + +namespace qpid { + namespace broker { + class MessageStore; + + /** + * Thrown when exclusive access would be violated. + */ + struct ExclusiveAccessException{}; + + using std::string; + + /** + * The brokers representation of an amqp queue. Messages are + * delivered to a queue from where they can be dispatched to + * registered consumers or be stored until dequeued or until one + * or more consumers registers. + */ + class Queue{ + const string name; + const u_int32_t autodelete; + MessageStore* const store; + const ConnectionToken* const owner; + std::vector<Consumer*> consumers; + std::queue<Binding*> bindings; + std::queue<Message::shared_ptr> messages; + bool queueing; + bool dispatching; + int next; + mutable qpid::sys::Mutex lock; + int64_t lastUsed; + Consumer* exclusive; + mutable u_int64_t persistenceId; + std::auto_ptr<QueuePolicy> policy; + + void pop(); + void push(Message::shared_ptr& msg); + bool startDispatching(); + bool dispatch(Message::shared_ptr& msg); + void setPolicy(std::auto_ptr<QueuePolicy> policy); + + public: + + typedef boost::shared_ptr<Queue> shared_ptr; + + typedef std::vector<shared_ptr> vector; + + Queue(const string& name, u_int32_t autodelete = 0, + MessageStore* const store = 0, + const ConnectionToken* const owner = 0); + ~Queue(); + + void create(const qpid::framing::FieldTable& settings); + void configure(const qpid::framing::FieldTable& settings); + void destroy(); + /** + * Informs the queue of a binding that should be cancelled on + * destruction of the queue. + */ + void bound(Binding* b); + /** + * Delivers a message to the queue. Will record it as + * enqueued if persistent then process it. + */ + void deliver(Message::shared_ptr& msg); + /** + * Dispatches the messages immediately to a consumer if + * one is available or stores it for later if not. + */ + void process(Message::shared_ptr& msg); + /** + * Used during recovery to add stored messages back to the queue + */ + void recover(Message::shared_ptr& msg); + /** + * Dispatch any queued messages providing there are + * consumers for them. Only one thread can be dispatching + * at any time, but this method (rather than the caller) + * is responsible for ensuring that. + */ + void dispatch(); + void consume(Consumer* c, bool exclusive = false); + void cancel(Consumer* c); + u_int32_t purge(); + u_int32_t getMessageCount() const; + u_int32_t getConsumerCount() const; + inline const string& getName() const { return name; } + inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } + inline bool hasExclusiveConsumer() const { return exclusive; } + inline u_int64_t getPersistenceId() const { return persistenceId; } + inline void setPersistenceId(u_int64_t _persistenceId) const { persistenceId = _persistenceId; } + + bool canAutoDelete() const; + + void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); + /** + * dequeue from store (only done once messages is acknowledged) + */ + void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); + /** + * dequeues from memory only + */ + Message::shared_ptr dequeue(); + + const QueuePolicy* const getPolicy(); + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/ConnectionToken.h b/Final/cpp/lib/broker/ConnectionToken.h new file mode 100644 index 0000000000..7e7f813d0e --- /dev/null +++ b/Final/cpp/lib/broker/ConnectionToken.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConnectionToken_ +#define _ConnectionToken_ + +namespace qpid { + namespace broker { + /** + * An empty interface allowing opaque implementations of some + * form of token to identify a connection. + */ + class ConnectionToken{ + public: + virtual ~ConnectionToken(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/Consumer.h b/Final/cpp/lib/broker/Consumer.h new file mode 100644 index 0000000000..26deef4a26 --- /dev/null +++ b/Final/cpp/lib/broker/Consumer.h @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Consumer_ +#define _Consumer_ + +#include <BrokerMessage.h> + +namespace qpid { + namespace broker { + class Consumer{ + public: + virtual bool deliver(Message::shared_ptr& msg) = 0; + virtual ~Consumer(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/Content.h b/Final/cpp/lib/broker/Content.h new file mode 100644 index 0000000000..8aacf02959 --- /dev/null +++ b/Final/cpp/lib/broker/Content.h @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Content_ +#define _Content_ + +#include <AMQContentBody.h> +#include <Buffer.h> +#include <OutputHandler.h> +#include <ProtocolVersion.h> + +namespace qpid { + namespace broker { + class Content{ + public: + virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0; + virtual u_int32_t size() = 0; + virtual void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0; + virtual void encode(qpid::framing::Buffer& buffer) = 0; + virtual void destroy() = 0; + virtual ~Content(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/Daemon.cpp b/Final/cpp/lib/broker/Daemon.cpp new file mode 100644 index 0000000000..3a0e687bf2 --- /dev/null +++ b/Final/cpp/lib/broker/Daemon.cpp @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Daemon.h" +#include "Exception.h" + +#include <boost/iostreams/stream.hpp> +#include <boost/iostreams/device/file_descriptor.hpp> + +#include <sstream> + +#include <errno.h> +#include <fcntl.h> +#include <signal.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +namespace qpid { +namespace broker { + +using namespace std; +typedef boost::iostreams::stream<boost::iostreams::file_descriptor> fdstream; + +namespace { +/** Throw an exception containing msg and strerror if throwIf is true. + * Name is supposed to be reminiscent of perror(). + */ +void terror(bool throwIf, const string& msg, int errNo=errno) { + if (throwIf) + throw Exception(msg + (errNo? ": "+strError(errNo) : string("."))); +} + + +struct LockFile : public fdstream { + + LockFile(const std::string& path_, bool create) + : path(path_), fd(-1), created(create) + { + errno = 0; + int flags=create ? O_WRONLY|O_CREAT|O_NOFOLLOW : O_RDWR; + fd = ::open(path.c_str(), flags, 0644); + terror(fd < 0,"Cannot open "+path); + terror(::lockf(fd, F_TLOCK, 0) < 0, "Cannot lock "+path); + open(boost::iostreams::file_descriptor(fd)); + } + + ~LockFile() { + if (fd >= 0) { + ::lockf(fd, F_ULOCK, 0); + close(); + } + } + + std::string path; + int fd; + bool created; +}; + +} // namespace + +Daemon::Daemon() { + pid = -1; + pipeFds[0] = pipeFds[1] = -1; +} + +string Daemon::dir() { + return (getuid() == 0 ? "/var/run" : "/tmp"); +} + +string Daemon::pidFile(uint16_t port) { + ostringstream path; + path << dir() << "/qpidd." << port << ".pid"; + return path.str(); +} + +void Daemon::fork() +{ + terror(pipe(pipeFds) < 0, "Can't create pipe"); + terror((pid = ::fork()) < 0, "Daemon fork failed"); + if (pid == 0) { // Child + try { + // File descriptors + terror(::close(pipeFds[0])<0, "Cannot close read pipe"); + terror(::close(0)<0, "Cannot close stdin"); + terror(::close(1)<0, "Cannot close stdout"); + terror(::close(2)<0, "Cannot close stderr"); + int fd=::open("/dev/null",O_RDWR); // stdin + terror(fd != 0, "Cannot re-open stdin"); + terror(::dup(fd)<0, "Cannot re-open stdout"); + terror(::dup(fd)<0, "Cannot re-open stderror"); + + // Misc + terror(setsid()<0, "Cannot set session ID"); + terror(chdir(dir().c_str()) < 0, "Cannot change directory to "+dir()); + umask(027); + + // Child behavior + child(); + } + catch (const exception& e) { + fdstream pipe(pipeFds[1]); + assert(pipe.is_open()); + pipe << "0 " << e.what() << endl; + } + } + else { // Parent + close(pipeFds[1]); // Write side. + parent(); + } +} + +Daemon::~Daemon() { + if (!lockFile.empty()) + unlink(lockFile.c_str()); +} + +uint16_t Daemon::wait(int timeout) { // parent waits for child. + errno = 0; + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + + fd_set fds; + FD_ZERO(&fds); + FD_SET(pipeFds[0], &fds); + terror(1 != select(FD_SETSIZE, &fds, 0, 0, &tv), "No response from daemon process"); + + fdstream pipe(pipeFds[0]); + uint16_t value = 0; + pipe >> value >> skipws; + if (value == 0) { + string errmsg; + getline(pipe, errmsg); + throw Exception("Daemon startup failed"+ (errmsg.empty() ? string(".") : ": " + errmsg)); + } + return value; +} + +void Daemon::ready(uint16_t port) { // child + lockFile = pidFile(port); + LockFile lf(lockFile, true); + lf << getpid() << endl; + if (lf.fail()) + throw Exception("Cannot write lock file "+lockFile); + fdstream pipe(pipeFds[1]); + pipe << port << endl;; +} + +pid_t Daemon::getPid(uint16_t port) { + string name = pidFile(port); + LockFile lockFile(name, false); + pid_t pid; + lockFile >> pid; + if (lockFile.fail()) + throw Exception("Cannot read lock file "+name); + if (kill(pid, 0) < 0 && errno != EPERM) { + unlink(name.c_str()); + throw Exception("Removing stale lock file "+name); + } + return pid; +} + + +}} // namespace qpid::broker diff --git a/Final/cpp/lib/broker/Daemon.h b/Final/cpp/lib/broker/Daemon.h new file mode 100644 index 0000000000..a9755bc8e7 --- /dev/null +++ b/Final/cpp/lib/broker/Daemon.h @@ -0,0 +1,84 @@ +#ifndef _broker_Daemon_h +#define _broker_Daemon_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <boost/scoped_ptr.hpp> +#include <boost/function.hpp> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace broker { + +/** + * Tools for forking and managing a daemon process. + * NB: Only one Daemon instance is allowed in a process. + */ +class Daemon : private boost::noncopyable +{ + public: + /** Check daemon is running on port, throw exception if not */ + static pid_t getPid(uint16_t port); + + Daemon(); + + virtual ~Daemon(); + + /** + * Fork a daemon process. + * Call parent() in the parent process, child() in the child. + */ + void fork(); + + protected: + + /** Called in parent process */ + virtual void parent() = 0; + + /** Called in child process */ + virtual void child() = 0; + + /** Call from parent(): wait for child to indicate it is ready. + * @timeout in seconds to wait for response. + * @return port passed by child to ready(). + */ + uint16_t wait(int timeout); + + /** Call from child(): Notify the parent we are ready and write the + * PID file. + *@param port returned by parent call to wait(). + */ + void ready(uint16_t port); + + private: + static std::string dir(); + static std::string pidFile(uint16_t port); + + pid_t pid; + int pipeFds[2]; + std::string lockFile; +}; + +}} // namespace qpid::broker + +#endif /*!_broker_Daemon_h*/ diff --git a/Final/cpp/lib/broker/DeletingTxOp.cpp b/Final/cpp/lib/broker/DeletingTxOp.cpp new file mode 100644 index 0000000000..25fe9c98db --- /dev/null +++ b/Final/cpp/lib/broker/DeletingTxOp.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <DeletingTxOp.h> + +using namespace qpid::broker; + +DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){} + +bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){ + return delegate && delegate->prepare(ctxt); +} + +void DeletingTxOp::commit() throw(){ + if(delegate){ + delegate->commit(); + delete delegate; + delegate = 0; + } +} + +void DeletingTxOp::rollback() throw(){ + if(delegate){ + delegate->rollback(); + delete delegate; + delegate = 0; + } +} diff --git a/Final/cpp/lib/broker/DeletingTxOp.h b/Final/cpp/lib/broker/DeletingTxOp.h new file mode 100644 index 0000000000..3e026cd4ca --- /dev/null +++ b/Final/cpp/lib/broker/DeletingTxOp.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeletingTxOp_ +#define _DeletingTxOp_ + +#include <TxOp.h> + +namespace qpid { + namespace broker { + /** + * TxOp wrapper that will delegate calls & delete the object + * to which it delegates after completion of the transaction. + */ + class DeletingTxOp : public virtual TxOp{ + TxOp* delegate; + public: + DeletingTxOp(TxOp* const delegate); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~DeletingTxOp(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/Deliverable.h b/Final/cpp/lib/broker/Deliverable.h new file mode 100644 index 0000000000..e33443555d --- /dev/null +++ b/Final/cpp/lib/broker/Deliverable.h @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Deliverable_ +#define _Deliverable_ + +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + class Deliverable{ + public: + virtual void deliverTo(Queue::shared_ptr& queue) = 0; + virtual ~Deliverable(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/DeliverableMessage.cpp b/Final/cpp/lib/broker/DeliverableMessage.cpp new file mode 100644 index 0000000000..b9c89da690 --- /dev/null +++ b/Final/cpp/lib/broker/DeliverableMessage.cpp @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <DeliverableMessage.h> + +using namespace qpid::broker; + +DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg) +{ +} + +void DeliverableMessage::deliverTo(Queue::shared_ptr& queue) +{ + queue->deliver(msg); +} + diff --git a/Final/cpp/lib/broker/DeliverableMessage.h b/Final/cpp/lib/broker/DeliverableMessage.h new file mode 100644 index 0000000000..962f0da640 --- /dev/null +++ b/Final/cpp/lib/broker/DeliverableMessage.h @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeliverableMessage_ +#define _DeliverableMessage_ + +#include <Deliverable.h> +#include <BrokerMessage.h> +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + class DeliverableMessage : public Deliverable{ + Message::shared_ptr msg; + public: + DeliverableMessage(Message::shared_ptr& msg); + virtual void deliverTo(Queue::shared_ptr& queue); + virtual ~DeliverableMessage(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/DeliveryRecord.cpp b/Final/cpp/lib/broker/DeliveryRecord.cpp new file mode 100644 index 0000000000..19b01cc312 --- /dev/null +++ b/Final/cpp/lib/broker/DeliveryRecord.cpp @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <DeliveryRecord.h> +#include <BrokerChannel.h> + +using namespace qpid::broker; +using std::string; + +DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const string _consumerTag, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + deliveryTag(_deliveryTag), + pull(false){} + +DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(""), + deliveryTag(_deliveryTag), + pull(true){} + + +void DeliveryRecord::discard(TransactionContext* ctxt, const std::string* const xid) const{ + queue->dequeue(ctxt, msg, xid); +} + +void DeliveryRecord::discard() const{ + discard(0, 0); +} + +bool DeliveryRecord::matches(u_int64_t tag) const{ + return deliveryTag == tag; +} + +bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ + return range->covers(deliveryTag); +} + +void DeliveryRecord::redeliver(Channel* const channel) const{ + if(pull){ + //if message was originally sent as response to get, we must requeue it + requeue(); + }else{ + channel->deliver(msg, consumerTag, deliveryTag); + } +} + +void DeliveryRecord::requeue() const{ + msg->redeliver(); + queue->process(msg); +} + +void DeliveryRecord::addTo(Prefetch* const prefetch) const{ + if(!pull){ + //ignore 'pulled' messages (i.e. those that were sent in + //response to get) when calculating prefetch + prefetch->size += msg->contentSize(); + prefetch->count++; + } +} + +void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{ + if(!pull){ + //ignore 'pulled' messages (i.e. those that were sent in + //response to get) when calculating prefetch + prefetch->size -= msg->contentSize(); + prefetch->count--; + } +} diff --git a/Final/cpp/lib/broker/DeliveryRecord.h b/Final/cpp/lib/broker/DeliveryRecord.h new file mode 100644 index 0000000000..01a4024b28 --- /dev/null +++ b/Final/cpp/lib/broker/DeliveryRecord.h @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeliveryRecord_ +#define _DeliveryRecord_ + +#include <algorithm> +#include <list> +#include <AccumulatedAck.h> +#include <BrokerMessage.h> +#include <Prefetch.h> +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + class Channel; + + /** + * Record of a delivery for which an ack is outstanding. + */ + class DeliveryRecord{ + mutable Message::shared_ptr msg; + mutable Queue::shared_ptr queue; + std::string consumerTag; + u_int64_t deliveryTag; + bool pull; + + public: + DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const u_int64_t deliveryTag); + DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag); + + void discard() const; + void discard(TransactionContext* ctxt, const std::string* const xid) const; + bool matches(u_int64_t tag) const; + bool coveredBy(const AccumulatedAck* const range) const; + void requeue() const; + void redeliver(Channel* const) const; + void addTo(Prefetch* const prefetch) const; + void subtractFrom(Prefetch* const prefetch) const; + }; + + typedef std::list<DeliveryRecord>::iterator ack_iterator; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/DirectExchange.cpp b/Final/cpp/lib/broker/DirectExchange.cpp new file mode 100644 index 0000000000..c898ae8d7e --- /dev/null +++ b/Final/cpp/lib/broker/DirectExchange.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <DirectExchange.h> +#include <ExchangeBinding.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { + +} + +void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ + Mutex::ScopedLock l(lock); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + if(i == queues.end()){ + bindings[routingKey].push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); + } +} + +void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ + Mutex::ScopedLock l(lock); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + + std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + if(i < queues.end()){ + queues.erase(i); + if(queues.empty()){ + bindings.erase(routingKey); + } + } +} + +void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ + Mutex::ScopedLock l(lock); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + int count(0); + for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ + msg.deliverTo(*i); + } + if(!count){ + std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; + } +} + +DirectExchange::~DirectExchange(){ + +} + + +const std::string DirectExchange::typeName("direct"); diff --git a/Final/cpp/lib/broker/DirectExchange.h b/Final/cpp/lib/broker/DirectExchange.h new file mode 100644 index 0000000000..a7ef5aca9e --- /dev/null +++ b/Final/cpp/lib/broker/DirectExchange.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DirectExchange_ +#define _DirectExchange_ + +#include <map> +#include <vector> +#include <BrokerExchange.h> +#include <FieldTable.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + class DirectExchange : public virtual Exchange{ + std::map<string, std::vector<Queue::shared_ptr> > bindings; + qpid::sys::Mutex lock; + + public: + static const std::string typeName; + + DirectExchange(const std::string& name); + + virtual std::string getType(){ return typeName; } + + virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual ~DirectExchange(); + }; +} +} + + +#endif diff --git a/Final/cpp/lib/broker/ExchangeBinding.cpp b/Final/cpp/lib/broker/ExchangeBinding.cpp new file mode 100644 index 0000000000..bf2102414d --- /dev/null +++ b/Final/cpp/lib/broker/ExchangeBinding.cpp @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <ExchangeBinding.h> +#include <BrokerExchange.h> + +using namespace qpid::broker; +using namespace qpid::framing; + +ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){} + +void ExchangeBinding::cancel(){ + e->unbind(q, key, args); + delete this; +} + +ExchangeBinding::~ExchangeBinding(){ +} diff --git a/Final/cpp/lib/broker/ExchangeBinding.h b/Final/cpp/lib/broker/ExchangeBinding.h new file mode 100644 index 0000000000..2afaa89552 --- /dev/null +++ b/Final/cpp/lib/broker/ExchangeBinding.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ExchangeBinding_ +#define _ExchangeBinding_ + +#include <Binding.h> +#include <FieldTable.h> +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + class Exchange; + class Queue; + + class ExchangeBinding : public virtual Binding{ + Exchange* e; + Queue::shared_ptr q; + const string key; + const qpid::framing::FieldTable* args; + public: + ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const qpid::framing::FieldTable* _args); + virtual void cancel(); + virtual ~ExchangeBinding(); + }; + } +} + + +#endif + diff --git a/Final/cpp/lib/broker/ExchangeRegistry.cpp b/Final/cpp/lib/broker/ExchangeRegistry.cpp new file mode 100644 index 0000000000..7bf96c4544 --- /dev/null +++ b/Final/cpp/lib/broker/ExchangeRegistry.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <ExchangeRegistry.h> +#include <DirectExchange.h> +#include <FanOutExchange.h> +#include <HeadersExchange.h> +#include <TopicExchange.h> + +using namespace qpid::broker; +using namespace qpid::sys; +using std::pair; + +pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){ + Mutex::ScopedLock locker(lock); + ExchangeMap::iterator i = exchanges.find(name); + if (i == exchanges.end()) { + Exchange::shared_ptr exchange; + + if(type == TopicExchange::typeName){ + exchange = Exchange::shared_ptr(new TopicExchange(name)); + }else if(type == DirectExchange::typeName){ + exchange = Exchange::shared_ptr(new DirectExchange(name)); + }else if(type == FanOutExchange::typeName){ + exchange = Exchange::shared_ptr(new FanOutExchange(name)); + }else if (type == HeadersExchange::typeName) { + exchange = Exchange::shared_ptr(new HeadersExchange(name)); + }else{ + throw UnknownExchangeTypeException(); + } + exchanges[name] = exchange; + return std::pair<Exchange::shared_ptr, bool>(exchange, true); + } else { + return std::pair<Exchange::shared_ptr, bool>(i->second, false); + } +} + +void ExchangeRegistry::destroy(const string& name){ + Mutex::ScopedLock locker(lock); + exchanges.erase(name); +} + +Exchange::shared_ptr ExchangeRegistry::get(const string& name){ + Mutex::ScopedLock locker(lock); + return exchanges[name]; +} + +namespace +{ +const std::string empty; +} + +Exchange::shared_ptr ExchangeRegistry::getDefault() +{ + return get(empty); +} diff --git a/Final/cpp/lib/broker/ExchangeRegistry.h b/Final/cpp/lib/broker/ExchangeRegistry.h new file mode 100644 index 0000000000..8dcd0d3623 --- /dev/null +++ b/Final/cpp/lib/broker/ExchangeRegistry.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ExchangeRegistry_ +#define _ExchangeRegistry_ + +#include <map> +#include <BrokerExchange.h> +#include <sys/Monitor.h> + +namespace qpid { +namespace broker { + struct UnknownExchangeTypeException{}; + + class ExchangeRegistry{ + typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap; + ExchangeMap exchanges; + qpid::sys::Mutex lock; + public: + std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException); + void destroy(const std::string& name); + Exchange::shared_ptr get(const std::string& name); + Exchange::shared_ptr getDefault(); + }; +} +} + + +#endif diff --git a/Final/cpp/lib/broker/FanOutExchange.cpp b/Final/cpp/lib/broker/FanOutExchange.cpp new file mode 100644 index 0000000000..48afcc20d5 --- /dev/null +++ b/Final/cpp/lib/broker/FanOutExchange.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <FanOutExchange.h> +#include <ExchangeBinding.h> +#include <algorithm> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} + +void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ + Mutex::ScopedLock locker(lock); + // Add if not already present. + Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + if (i == bindings.end()) { + bindings.push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); + } +} + +void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ + Mutex::ScopedLock locker(lock); + Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + if (i != bindings.end()) { + bindings.erase(i); + // TODO aconway 2006-09-14: What about the ExchangeBinding object? + // Don't we have to verify routingKey/args match? + } +} + +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ + Mutex::ScopedLock locker(lock); + for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ + msg.deliverTo(*i); + } +} + +FanOutExchange::~FanOutExchange() {} + +const std::string FanOutExchange::typeName("fanout"); diff --git a/Final/cpp/lib/broker/FanOutExchange.h b/Final/cpp/lib/broker/FanOutExchange.h new file mode 100644 index 0000000000..6dc70e69bb --- /dev/null +++ b/Final/cpp/lib/broker/FanOutExchange.h @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _FanOutExchange_ +#define _FanOutExchange_ + +#include <map> +#include <vector> +#include <BrokerExchange.h> +#include <FieldTable.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + +class FanOutExchange : public virtual Exchange { + std::vector<Queue::shared_ptr> bindings; + qpid::sys::Mutex lock; + + public: + static const std::string typeName; + + FanOutExchange(const std::string& name); + + virtual std::string getType(){ return typeName; } + + virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual ~FanOutExchange(); +}; + +} +} + + + +#endif diff --git a/Final/cpp/lib/broker/HeadersExchange.cpp b/Final/cpp/lib/broker/HeadersExchange.cpp new file mode 100644 index 0000000000..acd344725a --- /dev/null +++ b/Final/cpp/lib/broker/HeadersExchange.cpp @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <HeadersExchange.h> +#include <ExchangeBinding.h> +#include <Value.h> +#include <QpidError.h> +#include <algorithm> + + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +// TODO aconway 2006-09-20: More efficient matching algorithm. +// The current search algorithm really sucks. +// Fieldtables are heavy, maybe use shared_ptr to do handle-body. + +using namespace qpid::broker; + +namespace { + const std::string all("all"); + const std::string any("any"); + const std::string x_match("x-match"); +} + +HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } + +void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ + Mutex::ScopedLock locker(lock); + std::string what = args->getString("x-match"); + if (what != all && what != any) { + THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); + } + bindings.push_back(Binding(*args, queue)); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); +} + +void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ + Mutex::ScopedLock locker(lock); + Bindings::iterator i = + std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); + if (i != bindings.end()) bindings.erase(i); +} + + +void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ + Mutex::ScopedLock locker(lock);; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, *args)) msg.deliverTo(i->second); + } +} + +HeadersExchange::~HeadersExchange() {} + +const std::string HeadersExchange::typeName("headers"); + +namespace +{ + + bool match_values(const Value& bind, const Value& msg) { + return dynamic_cast<const EmptyValue*>(&bind) || bind == msg; + } + +} + + +bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { + typedef FieldTable::ValueMap Map; + std::string what = bind.getString(x_match); + if (what == all) { + for (Map::const_iterator i = bind.getMap().begin(); + i != bind.getMap().end(); + ++i) + { + if (i->first != x_match) + { + Map::const_iterator j = msg.getMap().find(i->first); + if (j == msg.getMap().end()) return false; + if (!match_values(*(i->second), *(j->second))) return false; + } + } + return true; + } else if (what == any) { + for (Map::const_iterator i = bind.getMap().begin(); + i != bind.getMap().end(); + ++i) + { + if (i->first != x_match) + { + Map::const_iterator j = msg.getMap().find(i->first); + if (j != msg.getMap().end()) { + if (match_values(*(i->second), *(j->second))) return true; + } + } + } + return false; + } else { + return false; + } +} + + + diff --git a/Final/cpp/lib/broker/HeadersExchange.h b/Final/cpp/lib/broker/HeadersExchange.h new file mode 100644 index 0000000000..5e8da5ad85 --- /dev/null +++ b/Final/cpp/lib/broker/HeadersExchange.h @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _HeadersExchange_ +#define _HeadersExchange_ + +#include <vector> +#include <BrokerExchange.h> +#include <FieldTable.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + + +class HeadersExchange : public virtual Exchange { + typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding; + typedef std::vector<Binding> Bindings; + + Bindings bindings; + qpid::sys::Mutex lock; + + public: + static const std::string typeName; + + HeadersExchange(const string& name); + + virtual std::string getType(){ return typeName; } + + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual ~HeadersExchange(); + + static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs); +}; + + + +} +} + +#endif diff --git a/Final/cpp/lib/broker/InMemoryContent.cpp b/Final/cpp/lib/broker/InMemoryContent.cpp new file mode 100644 index 0000000000..07af8633e5 --- /dev/null +++ b/Final/cpp/lib/broker/InMemoryContent.cpp @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <InMemoryContent.h> + +using namespace qpid::broker; +using namespace qpid::framing; +using boost::static_pointer_cast; + +void InMemoryContent::add(AMQContentBody::shared_ptr data) +{ + content.push_back(data); +} + +u_int32_t InMemoryContent::size() +{ + int sum(0); + for (content_iterator i = content.begin(); i != content.end(); i++) { + sum += (*i)->size(); + } + return sum; +} + +void InMemoryContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize) +{ + for (content_iterator i = content.begin(); i != content.end(); i++) { + if ((*i)->size() > framesize) { + u_int32_t offset = 0; + for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { + string data = (*i)->getData().substr(offset, framesize); + out->send(new AMQFrame(version, channel, new AMQContentBody(data))); + offset += framesize; + } + u_int32_t remainder = (*i)->size() % framesize; + if (remainder) { + string data = (*i)->getData().substr(offset, remainder); + out->send(new AMQFrame(version, channel, new AMQContentBody(data))); + } + } else { + AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); + out->send(new AMQFrame(version, channel, contentBody)); + } + } +} + +void InMemoryContent::encode(Buffer& buffer) +{ + for (content_iterator i = content.begin(); i != content.end(); i++) { + (*i)->encode(buffer); + } +} + +void InMemoryContent::destroy() +{ +} diff --git a/Final/cpp/lib/broker/InMemoryContent.h b/Final/cpp/lib/broker/InMemoryContent.h new file mode 100644 index 0000000000..1db1acd7e1 --- /dev/null +++ b/Final/cpp/lib/broker/InMemoryContent.h @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _InMemoryContent_ +#define _InMemoryContent_ + +#include <Content.h> +#include <vector> + + +namespace qpid { + namespace broker { + class InMemoryContent : public Content{ + typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; + typedef content_list::iterator content_iterator; + + content_list content; + public: + void add(qpid::framing::AMQContentBody::shared_ptr data); + u_int32_t size(); + void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void encode(qpid::framing::Buffer& buffer); + void destroy(); + ~InMemoryContent(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/LazyLoadedContent.cpp b/Final/cpp/lib/broker/LazyLoadedContent.cpp new file mode 100644 index 0000000000..ec1ca3e195 --- /dev/null +++ b/Final/cpp/lib/broker/LazyLoadedContent.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <LazyLoadedContent.h> + +using namespace qpid::broker; +using namespace qpid::framing; + +LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, u_int64_t _expectedSize) : + store(_store), msg(_msg), expectedSize(_expectedSize) {} + +void LazyLoadedContent::add(AMQContentBody::shared_ptr data) +{ + store->appendContent(msg, data->getData()); +} + +u_int32_t LazyLoadedContent::size() +{ + return 0;//all content is written as soon as it is added +} + +void LazyLoadedContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize) +{ + if (expectedSize > framesize) { + for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) { + u_int64_t remaining = expectedSize - offset; + string data; + store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining); + out->send(new AMQFrame(version, channel, new AMQContentBody(data))); + } + } else { + string data; + store->loadContent(msg, data, 0, expectedSize); + out->send(new AMQFrame(version, channel, new AMQContentBody(data))); + } +} + +void LazyLoadedContent::encode(Buffer&) +{ + //do nothing as all content is written as soon as it is added +} + +void LazyLoadedContent::destroy() +{ + store->destroy(msg); +} diff --git a/Final/cpp/lib/broker/LazyLoadedContent.h b/Final/cpp/lib/broker/LazyLoadedContent.h new file mode 100644 index 0000000000..80f8cce4eb --- /dev/null +++ b/Final/cpp/lib/broker/LazyLoadedContent.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _LazyLoadedContent_ +#define _LazyLoadedContent_ + +#include <Content.h> +#include <MessageStore.h> + +namespace qpid { + namespace broker { + class LazyLoadedContent : public Content{ + MessageStore* const store; + Message* const msg; + const u_int64_t expectedSize; + public: + LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize); + void add(qpid::framing::AMQContentBody::shared_ptr data); + u_int32_t size(); + void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void encode(qpid::framing::Buffer& buffer); + void destroy(); + ~LazyLoadedContent(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/Makefile.am b/Final/cpp/lib/broker/Makefile.am new file mode 100644 index 0000000000..1a3fc79dea --- /dev/null +++ b/Final/cpp/lib/broker/Makefile.am @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +AM_CXXFLAGS = $(WARNING_CFLAGS) +INCLUDES = \ + -I$(top_srcdir)/gen \ + -I$(top_srcdir)/lib/common \ + -I$(top_srcdir)/lib/common/sys \ + -I$(top_srcdir)/lib/common/framing \ + $(APR_CXXFLAGS) + +lib_LTLIBRARIES = libqpidbroker.la +libqpidbroker_la_LIBADD = \ + ../common/libqpidcommon.la \ + -lboost_iostreams + +libqpidbroker_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) +libqpidbroker_la_SOURCES = \ + AccumulatedAck.cpp \ + AccumulatedAck.h \ + AutoDelete.cpp \ + AutoDelete.h \ + Binding.h \ + Broker.cpp \ + Broker.h \ + BrokerChannel.cpp \ + BrokerChannel.h \ + BrokerExchange.h \ + BrokerMessage.cpp \ + BrokerMessage.h \ + BrokerQueue.cpp \ + BrokerQueue.h \ + ConnectionToken.h \ + Consumer.h \ + Content.h \ + Daemon.cpp \ + Daemon.h \ + DeletingTxOp.cpp \ + DeletingTxOp.h \ + Deliverable.h \ + DeliverableMessage.cpp \ + DeliverableMessage.h \ + DeliveryRecord.cpp \ + DeliveryRecord.h \ + DirectExchange.cpp \ + DirectExchange.h \ + ExchangeBinding.cpp \ + ExchangeBinding.h \ + ExchangeRegistry.cpp \ + ExchangeRegistry.h \ + FanOutExchange.cpp \ + FanOutExchange.h \ + HeadersExchange.cpp \ + HeadersExchange.h \ + InMemoryContent.cpp \ + InMemoryContent.h \ + LazyLoadedContent.cpp \ + LazyLoadedContent.h \ + MessageBuilder.cpp \ + MessageBuilder.h \ + MessageStore.h \ + MessageStoreModule.cpp \ + MessageStoreModule.h \ + NameGenerator.cpp \ + NameGenerator.h \ + NullMessageStore.cpp \ + NullMessageStore.h \ + Prefetch.h \ + QueuePolicy.cpp \ + QueuePolicy.h \ + QueueRegistry.cpp \ + QueueRegistry.h \ + RecoveryManager.cpp \ + RecoveryManager.h \ + SessionHandlerFactoryImpl.cpp \ + SessionHandlerFactoryImpl.h \ + SessionHandlerImpl.cpp \ + SessionHandlerImpl.h \ + TopicExchange.cpp \ + TopicExchange.h \ + TransactionalStore.h \ + TxAck.cpp \ + TxAck.h \ + TxBuffer.cpp \ + TxBuffer.h \ + TxOp.h \ + TxPublish.cpp \ + TxPublish.h + + +# Force build during dist phase so help2man will work. +dist-hook: $(lib_LTLIBRARIES) diff --git a/Final/cpp/lib/broker/MessageBuilder.cpp b/Final/cpp/lib/broker/MessageBuilder.cpp new file mode 100644 index 0000000000..41bf812d2d --- /dev/null +++ b/Final/cpp/lib/broker/MessageBuilder.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <MessageBuilder.h> + +#include <InMemoryContent.h> +#include <LazyLoadedContent.h> + +using namespace qpid::broker; +using namespace qpid::framing; +using std::auto_ptr; + +MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) : + handler(_handler), + store(_store), + stagingThreshold(_stagingThreshold) +{} + +void MessageBuilder::route(){ + if (message->isComplete()) { + if (handler) handler->complete(message); + message.reset(); + } +} + +void MessageBuilder::initialise(Message::shared_ptr& msg){ + if(message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); + } + message = msg; +} + +void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ + if(!message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); + } + message->setHeader(header); + if (stagingThreshold && header->getContentSize() >= stagingThreshold) { + store->stage(message.get()); + message->releaseContent(store); + } else { + auto_ptr<Content> content(new InMemoryContent()); + message->setContent(content); + } + route(); +} + +void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){ + if(!message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); + } + message->addContent(content); + route(); +} diff --git a/Final/cpp/lib/broker/MessageBuilder.h b/Final/cpp/lib/broker/MessageBuilder.h new file mode 100644 index 0000000000..4e51f223f0 --- /dev/null +++ b/Final/cpp/lib/broker/MessageBuilder.h @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MessageBuilder_ +#define _MessageBuilder_ + +#include <memory> +#include <QpidError.h> +#include <BrokerExchange.h> +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include <BasicPublishBody.h> + +namespace qpid { + namespace broker { + class MessageBuilder{ + public: + class CompletionHandler{ + public: + virtual void complete(Message::shared_ptr&) = 0; + virtual ~CompletionHandler(){} + }; + MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0); + void initialise(Message::shared_ptr& msg); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header); + void addContent(qpid::framing::AMQContentBody::shared_ptr& content); + private: + Message::shared_ptr message; + CompletionHandler* handler; + MessageStore* const store; + const u_int64_t stagingThreshold; + + void route(); + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/MessageStore.h b/Final/cpp/lib/broker/MessageStore.h new file mode 100644 index 0000000000..938f807a67 --- /dev/null +++ b/Final/cpp/lib/broker/MessageStore.h @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MessageStore_ +#define _MessageStore_ + +#include <BrokerMessage.h> +#include <FieldTable.h> +#include <RecoveryManager.h> +#include <TransactionalStore.h> + +namespace qpid { + namespace broker { + struct MessageStoreSettings + { + /** + * Messages whose content length is larger than this value + * will be staged (i.e. will have thier data written to + * disk as it arrives) and will load their data lazily. On + * recovery therefore, only the headers should be loaded. + */ + u_int64_t stagingThreshold; + }; + /** + * An abstraction of the persistent storage for messages. (In + * all methods, any pointers/references to queues or messages + * are valid only for the duration of the call). + */ + class MessageStore : public TransactionalStore{ + public: + /** + * Record the existance of a durable queue + */ + virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0; + /** + * Destroy a durable queue + */ + virtual void destroy(const Queue& queue) = 0; + + /** + * Request recovery of queue and message state from store + */ + virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0; + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(Message* const msg) = 0; + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(Message* const msg) = 0; + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(Message* const msg, const std::string& data) = 0; + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length) = 0; + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param msg the message to dequeue + * @param queue the name of th queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; + + /** + * Treat all enqueue/dequeues where this xid was specified as being prepared. + */ + virtual void prepared(const std::string * const xid) = 0; + /** + * Treat all enqueue/dequeues where this xid was specified as being committed. + */ + virtual void committed(const std::string * const xid) = 0; + /** + * Treat all enqueue/dequeues where this xid was specified as being aborted. + */ + virtual void aborted(const std::string * const xid) = 0; + + virtual ~MessageStore(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/MessageStoreModule.cpp b/Final/cpp/lib/broker/MessageStoreModule.cpp new file mode 100644 index 0000000000..ccc5501379 --- /dev/null +++ b/Final/cpp/lib/broker/MessageStoreModule.cpp @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <MessageStoreModule.h> +#include <iostream> + +using namespace qpid::broker; + +MessageStoreModule::MessageStoreModule(const std::string& name) : store(name) +{ +} + +void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings) +{ + store->create(queue, settings); +} + +void MessageStoreModule::destroy(const Queue& queue) +{ + store->destroy(queue); +} + +void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings) +{ + store->recover(registry, settings); +} + +void MessageStoreModule::stage(Message* const msg) +{ + store->stage(msg); +} + +void MessageStoreModule::destroy(Message* const msg) +{ + store->destroy(msg); +} + +void MessageStoreModule::appendContent(Message* const msg, const std::string& data) +{ + store->appendContent(msg, data); +} + +void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t offset, u_int32_t length) +{ + store->loadContent(msg, data, offset, length); +} + +void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid) +{ + store->enqueue(ctxt, msg, queue, xid); +} + +void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid) +{ + store->dequeue(ctxt, msg, queue, xid); +} + +void MessageStoreModule::prepared(const string * const xid) +{ + store->prepared(xid); +} + +void MessageStoreModule::committed(const string * const xid) +{ + store->committed(xid); +} + +void MessageStoreModule::aborted(const string * const xid) +{ + store->aborted(xid); +} + +std::auto_ptr<TransactionContext> MessageStoreModule::begin() +{ + return store->begin(); +} + +void MessageStoreModule::commit(TransactionContext* ctxt) +{ + store->commit(ctxt); +} + +void MessageStoreModule::abort(TransactionContext* ctxt) +{ + store->abort(ctxt); +} diff --git a/Final/cpp/lib/broker/MessageStoreModule.h b/Final/cpp/lib/broker/MessageStoreModule.h new file mode 100644 index 0000000000..c49e06efa1 --- /dev/null +++ b/Final/cpp/lib/broker/MessageStoreModule.h @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MessageStoreModule_ +#define _MessageStoreModule_ + +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <BrokerQueue.h> +#include <RecoveryManager.h> +#include <sys/Module.h> + +namespace qpid { + namespace broker { + /** + * A null implementation of the MessageStore interface + */ + class MessageStoreModule : public MessageStore{ + qpid::sys::Module<MessageStore> store; + public: + MessageStoreModule(const std::string& name); + void create(const Queue& queue, const qpid::framing::FieldTable& settings); + void destroy(const Queue& queue); + void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); + void stage(Message* const msg); + void destroy(Message* const msg); + void appendContent(Message* const msg, const std::string& data); + void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length); + void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); + void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); + void prepared(const std::string * const xid); + void committed(const std::string * const xid); + void aborted(const std::string * const xid); + std::auto_ptr<TransactionContext> begin(); + void commit(TransactionContext* ctxt); + void abort(TransactionContext* ctxt); + ~MessageStoreModule(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/NameGenerator.cpp b/Final/cpp/lib/broker/NameGenerator.cpp new file mode 100644 index 0000000000..3f281859fa --- /dev/null +++ b/Final/cpp/lib/broker/NameGenerator.cpp @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <NameGenerator.h> +#include <sstream> + +using namespace qpid::broker; + +NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {} + +std::string NameGenerator::generate(){ + std::stringstream ss; + ss << base << counter++; + return ss.str(); +} diff --git a/Final/cpp/lib/broker/NameGenerator.h b/Final/cpp/lib/broker/NameGenerator.h new file mode 100644 index 0000000000..b2dbbdfb69 --- /dev/null +++ b/Final/cpp/lib/broker/NameGenerator.h @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _NameGenerator_ +#define _NameGenerator_ + +#include <BrokerMessage.h> + +namespace qpid { + namespace broker { + class NameGenerator{ + const std::string base; + unsigned int counter; + public: + NameGenerator(const std::string& base); + std::string generate(); + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/NullMessageStore.cpp b/Final/cpp/lib/broker/NullMessageStore.cpp new file mode 100644 index 0000000000..dd58539925 --- /dev/null +++ b/Final/cpp/lib/broker/NullMessageStore.cpp @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <NullMessageStore.h> + +#include <BrokerQueue.h> +#include <RecoveryManager.h> + +#include <iostream> + +using namespace qpid::broker; + +NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} + +void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&) +{ + if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} + +void NullMessageStore::destroy(const Queue& queue) +{ + if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} + +void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const) +{ + if (warn) std::cout << "Persistence not enabled, no recovery attempted." << std::endl; +} + +void NullMessageStore::stage(Message* const) +{ + if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl; +} + +void NullMessageStore::destroy(Message* const) +{ + if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl; +} + +void NullMessageStore::appendContent(Message* const, const string&) +{ + if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl; +} + +void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t) +{ + if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl; +} + +void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const) +{ + if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} + +void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const) +{ + if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} + +void NullMessageStore::prepared(const string * const) +{ + if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; +} + +void NullMessageStore::committed(const string * const) +{ + if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; +} + +void NullMessageStore::aborted(const string * const) +{ + if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; +} + +std::auto_ptr<TransactionContext> NullMessageStore::begin() +{ + return std::auto_ptr<TransactionContext>(); +} + +void NullMessageStore::commit(TransactionContext*) +{ +} + +void NullMessageStore::abort(TransactionContext*) +{ +} diff --git a/Final/cpp/lib/broker/NullMessageStore.h b/Final/cpp/lib/broker/NullMessageStore.h new file mode 100644 index 0000000000..ef2bea8fd6 --- /dev/null +++ b/Final/cpp/lib/broker/NullMessageStore.h @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _NullMessageStore_ +#define _NullMessageStore_ + +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + + /** + * A null implementation of the MessageStore interface + */ + class NullMessageStore : public MessageStore{ + const bool warn; + public: + NullMessageStore(bool warn = true); + virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings); + virtual void destroy(const Queue& queue); + virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); + virtual void stage(Message* const msg); + virtual void destroy(Message* const msg); + virtual void appendContent(Message* const msg, const std::string& data); + virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length); + virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); + virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); + virtual void prepared(const std::string * const xid); + virtual void committed(const std::string * const xid); + virtual void aborted(const std::string * const xid); + virtual std::auto_ptr<TransactionContext> begin(); + virtual void commit(TransactionContext* ctxt); + virtual void abort(TransactionContext* ctxt); + ~NullMessageStore(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/Prefetch.h b/Final/cpp/lib/broker/Prefetch.h new file mode 100644 index 0000000000..a1adccaee7 --- /dev/null +++ b/Final/cpp/lib/broker/Prefetch.h @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Prefetch_ +#define _Prefetch_ + +#include <amqp_types.h> + +namespace qpid { + namespace broker { + /** + * Count and total size of asynchronously delivered + * (i.e. pushed) messages that have acks outstanding. + */ + struct Prefetch{ + u_int32_t size; + u_int16_t count; + + void reset() { size = 0; count = 0; } + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/QueuePolicy.cpp b/Final/cpp/lib/broker/QueuePolicy.cpp new file mode 100644 index 0000000000..e13fd62fc6 --- /dev/null +++ b/Final/cpp/lib/broker/QueuePolicy.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <QueuePolicy.h> + +using namespace qpid::broker; +using namespace qpid::framing; + +QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : + maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {} + +QueuePolicy::QueuePolicy(const FieldTable& settings) : + maxCount(getInt(settings, maxCountKey, 0)), + maxSize(getInt(settings, maxSizeKey, 0)), count(0), size(0) {} + +void QueuePolicy::enqueued(u_int64_t _size) +{ + if (maxCount) count++; + if (maxSize) size += _size; +} + +void QueuePolicy::dequeued(u_int64_t _size) +{ + if (maxCount) count--; + if (maxSize) size -= _size; +} + +bool QueuePolicy::limitExceeded() +{ + return (maxSize && size > maxSize) || (maxCount && count > maxCount); +} + +void QueuePolicy::update(FieldTable& settings) +{ + if (maxCount) settings.setInt(maxCountKey, maxCount); + if (maxSize) settings.setInt(maxSizeKey, maxSize); +} + + +int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int defaultValue) +{ + //Note: currently field table only contain signed 32 bit ints, which + // restricts the values that can be set on the queue policy. + try { + return settings.getInt(key); + } catch (FieldNotFoundException& ignore) { + return defaultValue; + } +} + +const std::string QueuePolicy::maxCountKey("qpid.max_count"); +const std::string QueuePolicy::maxSizeKey("qpid.max_size"); diff --git a/Final/cpp/lib/broker/QueuePolicy.h b/Final/cpp/lib/broker/QueuePolicy.h new file mode 100644 index 0000000000..597cfe7ce8 --- /dev/null +++ b/Final/cpp/lib/broker/QueuePolicy.h @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QueuePolicy_ +#define _QueuePolicy_ + +#include <FieldTable.h> + +namespace qpid { + namespace broker { + class QueuePolicy + { + static const std::string maxCountKey; + static const std::string maxSizeKey; + + const u_int32_t maxCount; + const u_int64_t maxSize; + u_int32_t count; + u_int64_t size; + + static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); + + public: + QueuePolicy(u_int32_t maxCount, u_int64_t maxSize); + QueuePolicy(const qpid::framing::FieldTable& settings); + void enqueued(u_int64_t size); + void dequeued(u_int64_t size); + void update(qpid::framing::FieldTable& settings); + bool limitExceeded(); + u_int32_t getMaxCount() const { return maxCount; } + u_int64_t getMaxSize() const { return maxSize; } + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/QueueRegistry.cpp b/Final/cpp/lib/broker/QueueRegistry.cpp new file mode 100644 index 0000000000..c69d553b06 --- /dev/null +++ b/Final/cpp/lib/broker/QueueRegistry.cpp @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <QueueRegistry.h> +#include <SessionHandlerImpl.h> +#include <sstream> +#include <assert.h> + +using namespace qpid::broker; +using namespace qpid::sys; + +QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){} + +QueueRegistry::~QueueRegistry() +{ +} + +std::pair<Queue::shared_ptr, bool> +QueueRegistry::declare(const string& declareName, bool durable, + u_int32_t autoDelete, const ConnectionToken* owner) +{ + Mutex::ScopedLock locker(lock); + string name = declareName.empty() ? generateName() : declareName; + assert(!name.empty()); + QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); + queues[name] = queue; + return std::pair<Queue::shared_ptr, bool>(queue, true); + } else { + return std::pair<Queue::shared_ptr, bool>(i->second, false); + } +} + +void QueueRegistry::destroy(const string& name){ + Mutex::ScopedLock locker(lock); + queues.erase(name); +} + +Queue::shared_ptr QueueRegistry::find(const string& name){ + Mutex::ScopedLock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { + return Queue::shared_ptr(); + } else { + return i->second; + } +} + +string QueueRegistry::generateName(){ + string name; + do { + std::stringstream ss; + ss << "tmp_" << counter++; + name = ss.str(); + // Thread safety: Private function, only called with lock held + // so this is OK. + } while(queues.find(name) != queues.end()); + return name; +} + +MessageStore* const QueueRegistry::getStore() const { + return store; +} diff --git a/Final/cpp/lib/broker/QueueRegistry.h b/Final/cpp/lib/broker/QueueRegistry.h new file mode 100644 index 0000000000..7232024675 --- /dev/null +++ b/Final/cpp/lib/broker/QueueRegistry.h @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QueueRegistry_ +#define _QueueRegistry_ + +#include <map> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + +/** + * A registry of queues indexed by queue name. + * + * Queues are reference counted using shared_ptr to ensure that they + * are deleted when and only when they are no longer in use. + * + */ +class QueueRegistry{ + + public: + QueueRegistry(MessageStore* const store = 0); + ~QueueRegistry(); + + /** + * Declare a queue. + * + * @return The queue and a boolean flag which is true if the queue + * was created by this declare call false if it already existed. + */ + std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, + const ConnectionToken* const owner = 0); + + /** + * Destroy the named queue. + * + * Note: if the queue is in use it is not actually destroyed until + * all shared_ptrs to it are destroyed. During that time it is + * possible that a new queue with the same name may be + * created. This should not create any problems as the new and + * old queues exist independently. The registry has + * forgotten the old queue so there can be no confusion for + * subsequent calls to find or declare with the same name. + * + */ + void destroy(const string& name); + + /** + * Find the named queue. Return 0 if not found. + */ + Queue::shared_ptr find(const string& name); + + /** + * Generate unique queue name. + */ + string generateName(); + + /** + * Return the message store used. + */ + MessageStore* const getStore() const; + + + private: + typedef std::map<string, Queue::shared_ptr> QueueMap; + QueueMap queues; + qpid::sys::Mutex lock; + int counter; + MessageStore* const store; +}; + + +} +} + + +#endif diff --git a/Final/cpp/lib/broker/RecoveryManager.cpp b/Final/cpp/lib/broker/RecoveryManager.cpp new file mode 100644 index 0000000000..6ea4c00c65 --- /dev/null +++ b/Final/cpp/lib/broker/RecoveryManager.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <RecoveryManager.h> + +using namespace qpid::broker; + +RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {} + +RecoveryManager::~RecoveryManager() {} + +Queue::shared_ptr RecoveryManager::recoverQueue(const string& name) +{ + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); + Exchange::shared_ptr exchange = exchanges.getDefault(); + if (exchange) { + exchange->bind(result.first, result.first->getName(), 0); + } + return result.first; +} + +Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type) +{ + return exchanges.declare(name, type).first; +} diff --git a/Final/cpp/lib/broker/RecoveryManager.h b/Final/cpp/lib/broker/RecoveryManager.h new file mode 100644 index 0000000000..d4e4cff3fd --- /dev/null +++ b/Final/cpp/lib/broker/RecoveryManager.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _RecoveryManager_ +#define _RecoveryManager_ + +#include <ExchangeRegistry.h> +#include <QueueRegistry.h> + +namespace qpid { +namespace broker { + + class RecoveryManager{ + QueueRegistry& queues; + ExchangeRegistry& exchanges; + public: + RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges); + ~RecoveryManager(); + Queue::shared_ptr recoverQueue(const std::string& name); + Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type); + }; + + +} +} + + +#endif diff --git a/Final/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/Final/cpp/lib/broker/SessionHandlerFactoryImpl.cpp new file mode 100644 index 0000000000..1b5441e3cf --- /dev/null +++ b/Final/cpp/lib/broker/SessionHandlerFactoryImpl.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <SessionHandlerFactoryImpl.h> + +#include <DirectExchange.h> +#include <FanOutExchange.h> +#include <HeadersExchange.h> +#include <MessageStoreModule.h> +#include <NullMessageStore.h> +#include <SessionHandlerImpl.h> + +using namespace qpid::broker; +using namespace qpid::sys; + +namespace +{ +const std::string empty; +const std::string amq_direct("amq.direct"); +const std::string amq_topic("amq.topic"); +const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); +} + +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) : + store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), + queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10) +{ + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. + exchanges.declare(amq_direct, DirectExchange::typeName); + exchanges.declare(amq_topic, TopicExchange::typeName); + exchanges.declare(amq_fanout, FanOutExchange::typeName); + exchanges.declare(amq_match, HeadersExchange::typeName); + + if(store.get()) { + RecoveryManager recoverer(queues, exchanges); + MessageStoreSettings storeSettings = { settings.stagingThreshold }; + store->recover(recoverer, &storeSettings); + } + + cleaner.start(); +} + +SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt) +{ + return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings); +} + +SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() +{ + cleaner.stop(); +} diff --git a/Final/cpp/lib/broker/SessionHandlerFactoryImpl.h b/Final/cpp/lib/broker/SessionHandlerFactoryImpl.h new file mode 100644 index 0000000000..a69b67b08d --- /dev/null +++ b/Final/cpp/lib/broker/SessionHandlerFactoryImpl.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandlerFactoryImpl_ +#define _SessionHandlerFactoryImpl_ + +#include <AutoDelete.h> +#include <ExchangeRegistry.h> +#include <MessageStore.h> +#include <QueueRegistry.h> +#include <AMQFrame.h> +#include <ProtocolInitiation.h> +#include <sys/SessionContext.h> +#include <sys/SessionHandler.h> +#include <sys/SessionHandlerFactory.h> +#include <sys/TimeoutHandler.h> +#include <SessionHandlerImpl.h> +#include <memory> + +namespace qpid { + namespace broker { + + class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory + { + std::auto_ptr<MessageStore> store; + QueueRegistry queues; + ExchangeRegistry exchanges; + const Settings settings; + AutoDelete cleaner; + public: + SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000); + virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); + virtual ~SessionHandlerFactoryImpl(); + }; + + } +} + + +#endif diff --git a/Final/cpp/lib/broker/SessionHandlerImpl.cpp b/Final/cpp/lib/broker/SessionHandlerImpl.cpp new file mode 100644 index 0000000000..b23432e29d --- /dev/null +++ b/Final/cpp/lib/broker/SessionHandlerImpl.cpp @@ -0,0 +1,468 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <sstream> +#include <SessionHandlerImpl.h> +#include <FanOutExchange.h> +#include <HeadersExchange.h> +#include <TopicExchange.h> +#include "assert.h" + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; +using namespace qpid::sys; + +SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, + QueueRegistry* _queues, + ExchangeRegistry* _exchanges, + AutoDelete* _cleaner, + const Settings& _settings) : + context(_context), + queues(_queues), + exchanges(_exchanges), + cleaner(_cleaner), + settings(_settings), + basicHandler(new BasicHandlerImpl(this)), + channelHandler(new ChannelHandlerImpl(this)), + connectionHandler(new ConnectionHandlerImpl(this)), + exchangeHandler(new ExchangeHandlerImpl(this)), + queueHandler(new QueueHandlerImpl(this)), + txHandler(new TxHandlerImpl(this)), + framemax(65536), + heartbeat(0) + { + + client =NULL; +} + +SessionHandlerImpl::~SessionHandlerImpl(){ + + if (client != NULL) + delete client; + +} + +Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ + channel_iterator i = channels.find(channel); + if(i == channels.end()){ + std::stringstream out; + out << "Unknown channel: " << channel; + throw ConnectionException(504, out.str()); + } + return i->second; +} + +Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ + Queue::shared_ptr queue; + if (name.empty()) { + queue = getChannel(channel)->getDefaultQueue(); + if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); + } else { + queue = queues->find(name); + if (queue == 0) { + throw ChannelException( 404, "Queue not found: " + name); + } + } + return queue; +} + + +Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ + return exchanges->get(name); +} + +void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ + u_int16_t channel = frame->getChannel(); + AMQBody::shared_ptr body = frame->getBody(); + AMQMethodBody::shared_ptr method; + + switch(body->type()) + { + case METHOD_BODY: + method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); + try{ + method->invoke(*this, channel); + }catch(ChannelException& e){ + channels[channel]->close(); + channels.erase(channel); + client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + }catch(ConnectionException& e){ + client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + context->close(); + }catch(std::exception& e){ + string error(e.what()); + client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId()); + context->close(); + } + break; + + case HEADER_BODY: + this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + break; + + case CONTENT_BODY: + this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + break; + + case HEARTBEAT_BODY: + //channel must be 0 + this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + break; + } +} + +void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ + + if (client == NULL) + { + client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); + + + //send connection start + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); // channel, majour, minor + client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales); + } +} + + +void SessionHandlerImpl::idleOut(){ + +} + +void SessionHandlerImpl::idleIn(){ + +} + +void SessionHandlerImpl::closed(){ + try { + for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ + Channel* c = i->second; + channels.erase(i); + c->close(); + delete c; + } + for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ + string name = (*i)->getName(); + queues->destroy(name); + exclusiveQueues.erase(i); + } + } catch(std::exception& e) { + std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl; + } +} + +void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ + getChannel(channel)->handleHeader(body); +} + +void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ + getChannel(channel)->handleContent(body); +} + +void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ + std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl; +} + +void SessionHandlerImpl::ConnectionHandlerImpl::startOk( + u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/){ + parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} + +void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ + parent->framemax = framemax; + parent->heartbeat = heartbeat; +} + +void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ + string knownhosts; + parent->client->getConnection().openOk(0, knownhosts); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::close( + u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/) +{ + parent->client->getConnection().closeOk(0); + parent->context->close(); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ + parent->context->close(); +} + + + +void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ + + + if (parent->channels[channel] == 0) { + parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax, + parent->queues->getStore(), parent->settings.stagingThreshold); + parent->client->getChannel().openOk(channel); + } else { + std::stringstream out; + out << "Channel already open: " << channel; + throw ConnectionException(504, out.str()); + } +} + +void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){ + parent->getChannel(channel)->flow(active); + parent->client->getChannel().flowOk(channel, active); +} +void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} + +void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/){ + Channel* c = parent->getChannel(channel); + if(c){ + parent->channels.erase(channel); + c->close(); + delete c; + parent->client->getChannel().closeOk(channel); + } +} + +void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} + + + +void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ + + if(passive){ + if(!parent->exchanges->get(exchange)){ + throw ChannelException(404, "Exchange not found: " + exchange); + } + }else{ + try{ + std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type); + if(!response.second && response.first->getType() != type){ + throw ConnectionException(530, "Exchange already declared to be of type " + + response.first->getType() + ", requested " + type); + } + }catch(UnknownExchangeTypeException& e){ + throw ConnectionException(503, "Exchange type not implemented: " + type); + } + } + if(!nowait){ + parent->client->getExchange().declareOk(channel); + } +} + +void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ + + //TODO: implement unused + parent->exchanges->destroy(exchange); + if(!nowait) parent->client->getExchange().deleteOk(channel); +} + +void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + Queue::shared_ptr queue; + if (passive && !name.empty()) { + queue = parent->getQueue(name, channel); + } else { + std::pair<Queue::shared_ptr, bool> queue_created = + parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue + + //apply settings & create persistent record if required + queue_created.first->create(arguments); + + //add default binding: + parent->exchanges->getDefault()->bind(queue, name, 0); + if (exclusive) { + parent->exclusiveQueues.push_back(queue); + } else if(autoDelete){ + parent->cleaner->add(queue); + } + } + } + if (exclusive && !queue->isExclusiveOwner(parent)) { + throw ChannelException(405, "Cannot grant exclusive access to queue"); + } + parent->getChannel(channel)->setDefaultQueue(queue); + if (!nowait) { + string queueName = queue->getName(); + parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); + } +} + +void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName); + if(exchange){ +// kpvdr - cannot use this any longer as routingKey is now const +// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); +// exchange->bind(queue, routingKey, &arguments); + string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; + exchange->bind(queue, exchangeRoutingKey, &arguments); + if(!nowait) parent->client->getQueue().bindOk(channel); + }else{ + throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); + } +} + +void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + int count = queue->purge(); + if(!nowait) parent->client->getQueue().purgeOk(channel, count); +} + +void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ + ChannelException error(0, ""); + int count(0); + Queue::shared_ptr q = parent->getQueue(queue, channel); + if(ifEmpty && q->getMessageCount() > 0){ + throw ChannelException(406, "Queue not empty."); + }else if(ifUnused && q->getConsumerCount() > 0){ + throw ChannelException(406, "Queue in use."); + }else{ + //remove the queue from the list of exclusive queues if necessary + if(q->isExclusiveOwner(parent)){ + queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); + if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); + } + count = q->getMessageCount(); + q->destroy(); + parent->queues->destroy(queue); + } + + if(!nowait) parent->client->getQueue().deleteOk(channel, count); +} + + + + +void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ + //TODO: handle global + parent->getChannel(channel)->setPrefetchSize(prefetchSize); + parent->getChannel(channel)->setPrefetchCount(prefetchCount); + parent->client->getBasic().qosOk(channel); +} + +void SessionHandlerImpl::BasicHandlerImpl::consume( + u_int16_t channelId, u_int16_t /*ticket*/, + const string& queueName, const string& consumerTag, + bool noLocal, bool noAck, bool exclusive, + bool nowait, const FieldTable& fields) +{ + + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + Channel* channel = parent->channels[channelId]; + if(!consumerTag.empty() && channel->exists(consumerTag)){ + throw ConnectionException(530, "Consumer tags must be unique"); + } + + try{ + string newTag = consumerTag; + channel->consume( + newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields); + + if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag); + + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); + }catch(ExclusiveAccessException& e){ + if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); + else throw ChannelException(403, "Access would violate previously granted exclusivity"); + } + +} + +void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ + parent->getChannel(channel)->cancel(consumerTag); + + if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); +} + +void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchangeName, const string& routingKey, + bool mandatory, bool immediate){ + + Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); + if(exchange){ + Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); + parent->getChannel(channel)->handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } +} + +void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + if(!parent->getChannel(channelId)->get(queue, !noAck)){ + string clusterId;//not used, part of an imatix hack + + parent->client->getBasic().getEmpty(channelId, clusterId); + } +} + +void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ + try{ + parent->getChannel(channel)->ack(deliveryTag, multiple); + }catch(InvalidAckException& e){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + } +} + +void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} + +void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ + parent->getChannel(channel)->recover(requeue); + parent->client->getBasic().recoverOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ + parent->getChannel(channel)->begin(); + parent->client->getTx().selectOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){ + parent->getChannel(channel)->commit(); + parent->client->getTx().commitOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ + + parent->getChannel(channel)->rollback(); + parent->client->getTx().rollbackOk(channel); + parent->getChannel(channel)->recover(false); +} + diff --git a/Final/cpp/lib/broker/SessionHandlerImpl.h b/Final/cpp/lib/broker/SessionHandlerImpl.h new file mode 100644 index 0000000000..7e631b4505 --- /dev/null +++ b/Final/cpp/lib/broker/SessionHandlerImpl.h @@ -0,0 +1,270 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandlerImpl_ +#define _SessionHandlerImpl_ + +#include <map> +#include <sstream> +#include <vector> +#include <exception> +#include <AMQFrame.h> +#include <AMQP_ClientProxy.h> +#include <AMQP_ServerOperations.h> +#include <AutoDelete.h> +#include <ExchangeRegistry.h> +#include <BrokerChannel.h> +#include <ConnectionToken.h> +#include <DirectExchange.h> +#include <OutputHandler.h> +#include <ProtocolInitiation.h> +#include <QueueRegistry.h> +#include <sys/SessionContext.h> +#include <sys/SessionHandler.h> +#include <sys/TimeoutHandler.h> +#include <TopicExchange.h> + +namespace qpid { +namespace broker { + +struct ChannelException : public std::exception { + u_int16_t code; + string text; + ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} + ~ChannelException() throw() {} + const char* what() const throw() { return text.c_str(); } +}; + +struct ConnectionException : public std::exception { + u_int16_t code; + string text; + ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} + ~ConnectionException() throw() {} + const char* what() const throw() { return text.c_str(); } +}; + +class Settings { + public: + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + const u_int64_t stagingThreshold; + + Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} +}; + +class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, + public virtual qpid::framing::AMQP_ServerOperations, + public virtual ConnectionToken +{ + typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; + typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + + qpid::sys::SessionContext* context; + qpid::framing::AMQP_ClientProxy* client; + QueueRegistry* queues; + ExchangeRegistry* const exchanges; + AutoDelete* const cleaner; + const Settings settings; + + std::auto_ptr<BasicHandler> basicHandler; + std::auto_ptr<ChannelHandler> channelHandler; + std::auto_ptr<ConnectionHandler> connectionHandler; + std::auto_ptr<ExchangeHandler> exchangeHandler; + std::auto_ptr<QueueHandler> queueHandler; + std::auto_ptr<TxHandler> txHandler; + + std::map<u_int16_t, Channel*> channels; + std::vector<Queue::shared_ptr> exclusiveQueues; + + u_int32_t framemax; + u_int16_t heartbeat; + + void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); + void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); + void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + Channel* getChannel(u_int16_t channel); + /** + * Get named queue, never returns 0. + * @return: named queue or default queue for channel if name="" + * @exception: ChannelException if no queue of that name is found. + * @exception: ConnectionException if no queue specified and channel has not declared one. + */ + Queue::shared_ptr getQueue(const string& name, u_int16_t channel); + + Exchange::shared_ptr findExchange(const string& name); + + public: + SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, + ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings); + virtual void received(qpid::framing::AMQFrame* frame); + virtual void initiated(qpid::framing::ProtocolInitiation* header); + virtual void idleOut(); + virtual void idleIn(); + virtual void closed(); + virtual ~SessionHandlerImpl(); + + class ConnectionHandlerImpl : public virtual ConnectionHandler{ + SessionHandlerImpl* parent; + public: + inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, + const string& response, const string& locale); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void secureOk(u_int16_t channel, const string& response); + + virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, + u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ConnectionHandlerImpl(){} + }; + + class ChannelHandlerImpl : public virtual ChannelHandler{ + SessionHandlerImpl* parent; + public: + inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void open(u_int16_t channel, const string& outOfBand); + + virtual void flow(u_int16_t channel, bool active); + + virtual void flowOk(u_int16_t channel, bool active); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, + u_int16_t classId, u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ChannelHandlerImpl(){} + }; + + class ExchangeHandlerImpl : public virtual ExchangeHandler{ + SessionHandlerImpl* parent; + public: + inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, + bool passive, bool durable, bool autoDelete, bool internal, bool nowait, + const qpid::framing::FieldTable& arguments); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); + + virtual ~ExchangeHandlerImpl(){} + }; + + + class QueueHandlerImpl : public virtual QueueHandler{ + SessionHandlerImpl* parent; + public: + inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); + + virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue, + const string& exchange, const string& routingKey, bool nowait, + const qpid::framing::FieldTable& arguments); + + virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, + bool nowait); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, + bool nowait); + + virtual ~QueueHandlerImpl(){} + }; + + class BasicHandlerImpl : public virtual BasicHandler{ + SessionHandlerImpl* parent; + public: + inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); + + virtual void consume( + u_int16_t channel, u_int16_t ticket, const string& queue, + const string& consumerTag, bool noLocal, bool noAck, + bool exclusive, bool nowait, + const qpid::framing::FieldTable& fields); + + virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait); + + virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey, + bool mandatory, bool immediate); + + virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck); + + virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); + + virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); + + virtual void recover(u_int16_t channel, bool requeue); + + virtual ~BasicHandlerImpl(){} + }; + + class TxHandlerImpl : public virtual TxHandler{ + SessionHandlerImpl* parent; + public: + TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + virtual ~TxHandlerImpl() {} + virtual void select(u_int16_t channel); + virtual void commit(u_int16_t channel); + virtual void rollback(u_int16_t channel); + }; + + + inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); } + inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); } + inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); } + inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); } + inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); } + inline virtual TxHandler* getTxHandler(){ return txHandler.get(); } + + inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); } + inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); } + inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); } + inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); } + inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } + + // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test; + // however v0.9 will not - kpvdr 2006-11-17 + inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); } +}; + +} +} + + +#endif diff --git a/Final/cpp/lib/broker/TopicExchange.cpp b/Final/cpp/lib/broker/TopicExchange.cpp new file mode 100644 index 0000000000..3ebb3c8c56 --- /dev/null +++ b/Final/cpp/lib/broker/TopicExchange.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TopicExchange.h> +#include <ExchangeBinding.h> +#include <algorithm> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +// TODO aconway 2006-09-20: More efficient matching algorithm. +// Areas for improvement: +// - excessive string copying: should be 0 copy, match from original buffer. +// - match/lookup: use descision tree or other more efficient structure. + +Tokens& Tokens::operator=(const std::string& s) { + clear(); + if (s.empty()) return *this; + std::string::const_iterator i = s.begin(); + while (true) { + // Invariant: i is at the beginning of the next untokenized word. + std::string::const_iterator j = find(i, s.end(), '.'); + push_back(std::string(i, j)); + if (j == s.end()) return *this; + i = j + 1; + } + return *this; +} + +TopicPattern& TopicPattern::operator=(const Tokens& tokens) { + Tokens::operator=(tokens); + normalize(); + return *this; +} + +namespace { +const std::string hashmark("#"); +const std::string star("*"); +} + +void TopicPattern::normalize() { + std::string word; + Tokens::iterator i = begin(); + while (i != end()) { + if (*i == hashmark) { + ++i; + while (i != end()) { + // Invariant: *(i-1)==#, [begin()..i-1] is normalized. + if (*i == star) { // Move * before #. + std::swap(*i, *(i-1)); + ++i; + } else if (*i == hashmark) { + erase(i); // Remove extra # + } else { + break; + } + } + } else { + i ++; + } + } +} + + +namespace { +// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. +// Need StringRef class that operates on a string in place witout copy. +// Should be applied everywhere strings are extracted from frames. +// +bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) +{ + // Invariant: [pattern_begin..p) matches [target_begin..t) + Tokens::const_iterator p = pattern_begin; + Tokens::const_iterator t = target_begin; + while (p != pattern_end && t != target_end) + { + if (*p == star || *p == *t) { + ++p, ++t; + } else if (*p == hashmark) { + ++p; + if (do_match(p, pattern_end, t, target_end)) return true; + while (t != target_end) { + ++t; + if (do_match(p, pattern_end, t, target_end)) return true; + } + return false; + } else { + return false; + } + } + while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # + return t == target_end && p == pattern_end; +} +} + +bool TopicPattern::match(const Tokens& target) const +{ + return do_match(begin(), end(), target.begin(), target.end()); +} + +TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } + +void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ + Monitor::ScopedLock l(lock); + TopicPattern routingPattern(routingKey); + bindings[routingPattern].push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); +} + +void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ + Monitor::ScopedLock l(lock); + BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); + Queue::vector& qv(bi->second); + if (bi == bindings.end()) return; + Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); + if(q == qv.end()) return; + qv.erase(q); + if(qv.empty()) bindings.erase(bi); +} + + +void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ + Monitor::ScopedLock l(lock); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (i->first.match(routingKey)) { + Queue::vector& qv(i->second); + for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ + msg.deliverTo(*j); + } + } + } +} + +TopicExchange::~TopicExchange() {} + +const std::string TopicExchange::typeName("topic"); + + diff --git a/Final/cpp/lib/broker/TopicExchange.h b/Final/cpp/lib/broker/TopicExchange.h new file mode 100644 index 0000000000..fa0c86863a --- /dev/null +++ b/Final/cpp/lib/broker/TopicExchange.h @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TopicExchange_ +#define _TopicExchange_ + +#include <map> +#include <vector> +#include <BrokerExchange.h> +#include <FieldTable.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + +/** A vector of string tokens */ +class Tokens : public std::vector<std::string> { + public: + Tokens() {}; + // Default copy, assign, dtor are sufficient. + + /** Tokenize s, provides automatic conversion of string to Tokens */ + Tokens(const std::string& s) { operator=(s); } + /** Tokenizing assignment operator s */ + Tokens & operator=(const std::string& s); + + private: + size_t hash; +}; + + +/** + * Tokens that have been normalized as a pattern and can be matched + * with topic Tokens. Normalized meands all sequences of mixed * and + * # are reduced to a series of * followed by at most one #. + */ +class TopicPattern : public Tokens +{ + public: + TopicPattern() {} + // Default copy, assign, dtor are sufficient. + TopicPattern(const Tokens& tokens) { operator=(tokens); } + TopicPattern(const std::string& str) { operator=(str); } + TopicPattern& operator=(const Tokens&); + TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); } + + /** Match a topic */ + bool match(const std::string& topic) { return match(Tokens(topic)); } + bool match(const Tokens& topic) const; + + private: + void normalize(); +}; + +class TopicExchange : public virtual Exchange{ + typedef std::map<TopicPattern, Queue::vector> BindingMap; + BindingMap bindings; + qpid::sys::Mutex lock; + + public: + static const std::string typeName; + + TopicExchange(const string& name); + + virtual std::string getType(){ return typeName; } + + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual ~TopicExchange(); +}; + + + +} +} + +#endif diff --git a/Final/cpp/lib/broker/TransactionalStore.h b/Final/cpp/lib/broker/TransactionalStore.h new file mode 100644 index 0000000000..17bca3878a --- /dev/null +++ b/Final/cpp/lib/broker/TransactionalStore.h @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TransactionalStore_ +#define _TransactionalStore_ + +#include <memory> + +namespace qpid { + namespace broker { + struct InvalidTransactionContextException : public std::exception {}; + + class TransactionContext{ + public: + virtual ~TransactionContext(){} + }; + + class TransactionalStore{ + public: + virtual std::auto_ptr<TransactionContext> begin() = 0; + virtual void commit(TransactionContext*) = 0; + virtual void abort(TransactionContext*) = 0; + + virtual ~TransactionalStore(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/TxAck.cpp b/Final/cpp/lib/broker/TxAck.cpp new file mode 100644 index 0000000000..b5211158f3 --- /dev/null +++ b/Final/cpp/lib/broker/TxAck.cpp @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TxAck.h> + +using std::bind1st; +using std::bind2nd; +using std::mem_fun_ref; +using namespace qpid::broker; + +TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked, const std::string* const _xid) : + acked(_acked), unacked(_unacked), xid(_xid){ + +} + +bool TxAck::prepare(TransactionContext* ctxt) throw(){ + try{ + //dequeue all acked messages from their queues + for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { + if (i->coveredBy(&acked)) { + i->discard(ctxt, xid); + } + } + return true; + }catch(...){ + std::cout << "TxAck::prepare() - Failed to prepare" << std::endl; + return false; + } +} + +void TxAck::commit() throw(){ + //remove all acked records from the list + unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); +} + +void TxAck::rollback() throw(){ +} diff --git a/Final/cpp/lib/broker/TxAck.h b/Final/cpp/lib/broker/TxAck.h new file mode 100644 index 0000000000..88c321c445 --- /dev/null +++ b/Final/cpp/lib/broker/TxAck.h @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxAck_ +#define _TxAck_ + +#include <algorithm> +#include <functional> +#include <list> +#include <AccumulatedAck.h> +#include <DeliveryRecord.h> +#include <TxOp.h> + +namespace qpid { + namespace broker { + /** + * Defines the transactional behaviour for acks received by a + * transactional channel. + */ + class TxAck : public TxOp{ + AccumulatedAck& acked; + std::list<DeliveryRecord>& unacked; + const std::string* const xid; + + public: + /** + * @param acked a representation of the accumulation of + * acks received + * @param unacked the record of delivered messages + */ + TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked, const std::string* const xid = 0); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~TxAck(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/TxBuffer.cpp b/Final/cpp/lib/broker/TxBuffer.cpp new file mode 100644 index 0000000000..acd3283bb7 --- /dev/null +++ b/Final/cpp/lib/broker/TxBuffer.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TxBuffer.h> + +using std::mem_fun; +using namespace qpid::broker; + +bool TxBuffer::prepare(TransactionalStore* const store) +{ + std::auto_ptr<TransactionContext> ctxt; + if(store) ctxt = store->begin(); + for(op_iterator i = ops.begin(); i < ops.end(); i++){ + if(!(*i)->prepare(ctxt.get())){ + if(store) store->abort(ctxt.get()); + return false; + } + } + if(store) store->commit(ctxt.get()); + return true; +} + +void TxBuffer::commit() +{ + for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit)); + ops.clear(); +} + +void TxBuffer::rollback() +{ + for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback)); + ops.clear(); +} + +void TxBuffer::enlist(TxOp* const op) +{ + ops.push_back(op); +} diff --git a/Final/cpp/lib/broker/TxBuffer.h b/Final/cpp/lib/broker/TxBuffer.h new file mode 100644 index 0000000000..2d9a2a3679 --- /dev/null +++ b/Final/cpp/lib/broker/TxBuffer.h @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxBuffer_ +#define _TxBuffer_ + +#include <algorithm> +#include <functional> +#include <vector> +#include <TransactionalStore.h> +#include <TxOp.h> + +/** + * Represents a single transaction. As such, an instance of this class + * will hold a list of operations representing the workload of the + * transaction. This work can be committed or rolled back. Committing + * is a two-stage process: first all the operations should be + * prepared, then if that succeeds they can be committed. + * + * In the 2pc case, a successful prepare may be followed by either a + * commit or a rollback. + * + * Atomicity of prepare is ensured by using a lower level + * transactional facility. This saves explicitly rolling back all the + * successfully prepared ops when one of them fails. i.e. we do not + * use 2pc internally, we instead ensure that prepare is atomic at a + * lower level. This makes individual prepare operations easier to + * code. + * + * Transactions on a messaging broker effect three types of 'action': + * (1) updates to persistent storage (2) updates to transient storage + * or cached data (3) network writes. + * + * Of these, (1) should always occur atomically during prepare to + * ensure that if the broker crashes while a transaction is being + * completed the persistent state (which is all that then remains) is + * consistent. (3) can only be done on commit, after a successful + * prepare. There is a little more flexibility with (2) but any + * changes made during prepare should be subject to the control of the + * TransactionalStore in use. + */ +namespace qpid { + namespace broker { + class TxBuffer{ + typedef std::vector<TxOp*>::iterator op_iterator; + std::vector<TxOp*> ops; + public: + /** + * Requests that all ops are prepared. This should + * primarily involve making sure that a persistent record + * of the operations is stored where necessary. + * + * All ops will be prepared under a transaction on the + * specified store. If any operation fails on prepare, + * this transaction will be rolled back. + * + * Once prepared, a transaction can be committed (or in + * the 2pc case, rolled back). + * + * @returns true if all the operations prepared + * successfully, false if not. + */ + bool prepare(TransactionalStore* const store); + /** + * Signals that the ops all prepared all completed + * successfully and can now commit, i.e. the operation can + * now be fully carried out. + * + * Should only be called after a call to prepare() returns + * true. + */ + void commit(); + /** + * Rolls back all the operations. + * + * Should only be called either after a call to prepare() + * returns true (2pc) or instead of a prepare call + * ('server-local') + */ + void rollback(); + /** + * Adds an operation to the transaction. + */ + void enlist(TxOp* const op); + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/TxOp.h b/Final/cpp/lib/broker/TxOp.h new file mode 100644 index 0000000000..abba84a8e8 --- /dev/null +++ b/Final/cpp/lib/broker/TxOp.h @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxOp_ +#define _TxOp_ + +#include <TransactionalStore.h> + +namespace qpid { + namespace broker { + class TxOp{ + public: + virtual bool prepare(TransactionContext*) throw() = 0; + virtual void commit() throw() = 0; + virtual void rollback() throw() = 0; + virtual ~TxOp(){} + }; + } +} + + +#endif diff --git a/Final/cpp/lib/broker/TxPublish.cpp b/Final/cpp/lib/broker/TxPublish.cpp new file mode 100644 index 0000000000..49dd8abd89 --- /dev/null +++ b/Final/cpp/lib/broker/TxPublish.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TxPublish.h> + +using namespace qpid::broker; + +TxPublish::TxPublish(Message::shared_ptr _msg, const std::string* const _xid) : msg(_msg), xid(_xid) {} + +bool TxPublish::prepare(TransactionContext* ctxt) throw(){ + try{ + for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, xid)); + return true; + }catch(...){ + std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl; + return false; + } +} + +void TxPublish::commit() throw(){ + for_each(queues.begin(), queues.end(), Commit(msg)); +} + +void TxPublish::rollback() throw(){ +} + +void TxPublish::deliverTo(Queue::shared_ptr& queue){ + queues.push_back(queue); +} + +TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid) + : ctxt(_ctxt), msg(_msg), xid(_xid){} + +void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ + queue->enqueue(ctxt, msg, xid); +} + +TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){} + +void TxPublish::Commit::operator()(Queue::shared_ptr& queue){ + queue->process(msg); +} + diff --git a/Final/cpp/lib/broker/TxPublish.h b/Final/cpp/lib/broker/TxPublish.h new file mode 100644 index 0000000000..75f201257e --- /dev/null +++ b/Final/cpp/lib/broker/TxPublish.h @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxPublish_ +#define _TxPublish_ + +#include <algorithm> +#include <functional> +#include <list> +#include <Deliverable.h> +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <BrokerQueue.h> +#include <TxOp.h> + +namespace qpid { + namespace broker { + /** + * Defines the behaviour for publish operations on a + * transactional channel. Messages are routed through + * exchanges when received but are not at that stage delivered + * to the matching queues, rather the queues are held in an + * instance of this class. On prepare() the message is marked + * enqueued to the relevant queues in the MessagesStore. On + * commit() the messages will be passed to the queue for + * dispatch or to be added to the in-memory queue. + */ + class TxPublish : public TxOp, public Deliverable{ + class Prepare{ + TransactionContext* ctxt; + Message::shared_ptr& msg; + const std::string* const xid; + public: + Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid); + void operator()(Queue::shared_ptr& queue); + }; + + class Commit{ + Message::shared_ptr& msg; + public: + Commit(Message::shared_ptr& msg); + void operator()(Queue::shared_ptr& queue); + }; + + Message::shared_ptr msg; + const std::string* const xid; + std::list<Queue::shared_ptr> queues; + + public: + TxPublish(Message::shared_ptr msg, const std::string* const xid = 0); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + + virtual void deliverTo(Queue::shared_ptr& queue); + + virtual ~TxPublish(){} + }; + } +} + + +#endif |