diff options
Diffstat (limited to 'cpp/lib/broker')
69 files changed, 5802 insertions, 0 deletions
diff --git a/cpp/lib/broker/AccumulatedAck.cpp b/cpp/lib/broker/AccumulatedAck.cpp new file mode 100644 index 0000000000..a9826ba5ea --- /dev/null +++ b/cpp/lib/broker/AccumulatedAck.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <AccumulatedAck.h> + +using std::less_equal; +using std::bind2nd; +using namespace qpid::broker; + +void AccumulatedAck::update(u_int64_t tag, bool multiple){ + if(multiple){ + if(tag > range) range = tag; + //else don't care, it is already counted + }else if(tag > range){ + individual.push_back(tag); + } +} + +void AccumulatedAck::consolidate(){ + individual.sort(); + //remove any individual tags that are covered by range + individual.remove_if(bind2nd(less_equal<u_int64_t>(), range)); +} + +void AccumulatedAck::clear(){ + range = 0; + individual.clear(); +} + +bool AccumulatedAck::covers(u_int64_t tag) const{ + return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end(); +} diff --git a/cpp/lib/broker/AccumulatedAck.h b/cpp/lib/broker/AccumulatedAck.h new file mode 100644 index 0000000000..055c8ea3e0 --- /dev/null +++ b/cpp/lib/broker/AccumulatedAck.h @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _AccumulatedAck_ +#define _AccumulatedAck_ + +#include <algorithm> +#include <functional> +#include <list> + +namespace qpid { + namespace broker { + /** + * Keeps an accumulated record of acked messages (by delivery + * tag). + */ + struct AccumulatedAck{ + /** + * If not zero, then everything up to this value has been + * acked. + */ + u_int64_t range; + /** + * List of individually acked messages that are not + * included in the range marked by 'range'. + */ + std::list<u_int64_t> individual; + + void update(u_int64_t tag, bool multiple); + void consolidate(); + void clear(); + bool covers(u_int64_t tag) const; + }; + } +} + + +#endif diff --git a/cpp/lib/broker/AutoDelete.cpp b/cpp/lib/broker/AutoDelete.cpp new file mode 100644 index 0000000000..ae48d10505 --- /dev/null +++ b/cpp/lib/broker/AutoDelete.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <AutoDelete.h> +#include <sys/Time.h> + +using namespace qpid::broker; +using namespace qpid::sys; + +AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) + : registry(_registry), period(_period), stopped(true) { } + +void AutoDelete::add(Queue::shared_ptr const queue){ + Mutex::ScopedLock l(lock); + queues.push(queue); +} + +Queue::shared_ptr const AutoDelete::pop(){ + Queue::shared_ptr next; + Mutex::ScopedLock l(lock); + if(!queues.empty()){ + next = queues.front(); + queues.pop(); + } + return next; +} + +void AutoDelete::process(){ + Queue::shared_ptr seen; + for(Queue::shared_ptr q = pop(); q; q = pop()){ + if(seen == q){ + add(q); + break; + }else if(q->canAutoDelete()){ + std::string name(q->getName()); + registry->destroy(name); + std::cout << "INFO: Auto-deleted queue named " << name << std::endl; + }else{ + add(q); + if(!seen) seen = q; + } + } +} + +void AutoDelete::run(){ + Monitor::ScopedLock l(monitor); + while(!stopped){ + process(); + monitor.wait(period*TIME_MSEC); + } +} + +void AutoDelete::start(){ + Monitor::ScopedLock l(monitor); + if(stopped){ + stopped = false; + runner = Thread(this); + } +} + +void AutoDelete::stop(){ + { + Monitor::ScopedLock l(monitor); + if(stopped) return; + stopped = true; + } + monitor.notify(); + runner.join(); +} diff --git a/cpp/lib/broker/AutoDelete.h b/cpp/lib/broker/AutoDelete.h new file mode 100644 index 0000000000..19a5938df1 --- /dev/null +++ b/cpp/lib/broker/AutoDelete.h @@ -0,0 +1,55 @@ +#ifndef _AutoDelete_ +#define _AutoDelete_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <queue> +#include <sys/Monitor.h> +#include <BrokerQueue.h> +#include <QueueRegistry.h> +#include <sys/Thread.h> + +namespace qpid { + namespace broker{ + class AutoDelete : private qpid::sys::Runnable { + qpid::sys::Mutex lock; + qpid::sys::Monitor monitor; + std::queue<Queue::shared_ptr> queues; + QueueRegistry* const registry; + u_int32_t period; + volatile bool stopped; + qpid::sys::Thread runner; + + Queue::shared_ptr const pop(); + void process(); + virtual void run(); + + public: + AutoDelete(QueueRegistry* const registry, u_int32_t period); + void add(Queue::shared_ptr const); + void start(); + void stop(); + }; + } +} + + +#endif diff --git a/cpp/lib/broker/Binding.h b/cpp/lib/broker/Binding.h new file mode 100644 index 0000000000..16ca223208 --- /dev/null +++ b/cpp/lib/broker/Binding.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Binding_ +#define _Binding_ + +#include <FieldTable.h> + +namespace qpid { + namespace broker { + class Binding{ + public: + virtual void cancel() = 0; + virtual ~Binding(){} + }; + } +} + + +#endif + diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp new file mode 100644 index 0000000000..6c0d7a3f3f --- /dev/null +++ b/cpp/lib/broker/Broker.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <memory> +#include <Broker.h> + + +using namespace qpid::broker; +using namespace qpid::sys; + +Broker::Broker(const Configuration& config) : + acceptor(Acceptor::create(config.getPort(), + config.getConnectionBacklog(), + config.getWorkerThreads(), + config.isTrace())), + factory(config.getStore()) +{ } + + +Broker::shared_ptr Broker::create(int16_t port) +{ + Configuration config; + config.setPort(port); + return create(config); +} + +Broker::shared_ptr Broker::create(const Configuration& config) { + return Broker::shared_ptr(new Broker(config)); +} + +void Broker::run() { + acceptor->run(&factory); +} + +void Broker::shutdown() { + acceptor->shutdown(); +} + +Broker::~Broker() { } + +const int16_t Broker::DEFAULT_PORT(5672); diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h new file mode 100644 index 0000000000..8ea1a57c27 --- /dev/null +++ b/cpp/lib/broker/Broker.h @@ -0,0 +1,82 @@ +#ifndef _Broker_ +#define _Broker_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <Configuration.h> +#include <SessionHandlerFactoryImpl.h> +#include <sys/Runnable.h> +#include <sys/Acceptor.h> +#include <SharedObject.h> + +namespace qpid { +namespace broker { +/** + * A broker instance. + */ +class Broker : public qpid::sys::Runnable, + public qpid::SharedObject<Broker> +{ + public: + static const int16_t DEFAULT_PORT; + + virtual ~Broker(); + + /** + * Create a broker. + * @param port Port to listen on or 0 to pick a port dynamically. + */ + static shared_ptr create(int16_t port = DEFAULT_PORT); + + /** + * Create a broker using a Configuration. + */ + static shared_ptr create(const Configuration& config); + + /** + * Return listening port. If called before bind this is + * the configured port. If called after it is the actual + * port, which will be different if the configured port is + * 0. + */ + virtual int16_t getPort() const { return acceptor->getPort(); } + + /** + * Run the broker. Implements Runnable::run() so the broker + * can be run in a separate thread. + */ + virtual void run(); + + /** Shut down the broker */ + virtual void shutdown(); + + private: + Broker(const Configuration& config); + qpid::sys::Acceptor::shared_ptr acceptor; + SessionHandlerFactoryImpl factory; +}; +} +} + + + +#endif /*!_Broker_*/ diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp new file mode 100644 index 0000000000..42e45dd291 --- /dev/null +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -0,0 +1,257 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerChannel.h> +#include <QpidError.h> +#include <iostream> +#include <sstream> +#include <assert.h> + +using std::mem_fun_ref; +using std::bind2nd; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + + +Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : + id(_id), + out(_out), + currentDeliveryTag(1), + transactional(false), + prefetchSize(0), + prefetchCount(0), + framesize(_framesize), + tagGenerator("sgen"), + store(0), + messageBuilder(this){ + + outstanding.reset(); +} + +Channel::~Channel(){ +} + +bool Channel::exists(const string& consumerTag){ + return consumers.find(consumerTag) != consumers.end(); +} + +void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){ + if(tag.empty()) tag = tagGenerator.generate(); + ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); + try{ + queue->consume(c, exclusive);//may throw exception + consumers[tag] = c; + }catch(ExclusiveAccessException& e){ + delete c; + throw e; + } +} + +void Channel::cancel(consumer_iterator i){ + ConsumerImpl* c = i->second; + consumers.erase(i); + if(c){ + c->cancel(); + delete c; + } +} + +void Channel::cancel(const string& tag){ + consumer_iterator i = consumers.find(tag); + if(i != consumers.end()){ + cancel(i); + } +} + +void Channel::close(){ + //cancel all consumers + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ + cancel(i); + } + //requeue: + recover(true); +} + +void Channel::begin(){ + transactional = true; +} + +void Channel::commit(){ + TxAck txAck(accumulatedAck, unacked); + txBuffer.enlist(&txAck); + if(txBuffer.prepare(store)){ + txBuffer.commit(); + } + accumulatedAck.clear(); +} + +void Channel::rollback(){ + txBuffer.rollback(); + accumulatedAck.clear(); +} + +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ + Mutex::ScopedLock locker(deliveryLock); + + u_int64_t deliveryTag = currentDeliveryTag++; + if(ackExpected){ + unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); + outstanding.size += msg->contentSize(); + outstanding.count++; + } + //send deliver method, header and content(s) + msg->deliver(out, id, consumerTag, deliveryTag, framesize); +} + +bool Channel::checkPrefetch(Message::shared_ptr& msg){ + Mutex::ScopedLock locker(deliveryLock); + bool countOk = !prefetchCount || prefetchCount > unacked.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); + return countOk && sizeOk; +} + +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, + Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack) : parent(_parent), + tag(_tag), + queue(_queue), + connection(_connection), + ackExpected(ack), + blocked(false){ +} + +bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ + if(!connection || connection != msg->getPublisher()){//check for no_local + if(ackExpected && !parent->checkPrefetch(msg)){ + blocked = true; + }else{ + blocked = false; + parent->deliver(msg, tag, queue, ackExpected); + return true; + } + } + return false; +} + +void Channel::ConsumerImpl::cancel(){ + if(queue) queue->cancel(this); +} + +void Channel::ConsumerImpl::requestDispatch(){ + if(blocked) queue->dispatch(); +} + +void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){ + Message::shared_ptr message(_message); + exchange = _exchange; + messageBuilder.initialise(message); +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ + messageBuilder.setHeader(header); + //at this point, decide based on the size of the message whether we want + //to stage it by saving content directly to disk as it arrives +} + +void Channel::handleContent(AMQContentBody::shared_ptr content){ + messageBuilder.addContent(content); +} + +void Channel::complete(Message::shared_ptr& msg){ + if(exchange){ + if(transactional){ + TxPublish* deliverable = new TxPublish(msg); + exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable)); + }else{ + DeliverableMessage deliverable(msg); + exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + } + exchange.reset(); + }else{ + std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; + } +} + +void Channel::ack(u_int64_t deliveryTag, bool multiple){ + if(transactional){ + accumulatedAck.update(deliveryTag, multiple); + //TODO: I think the outstanding prefetch size & count should be updated at this point... + //TODO: ...this may then necessitate dispatching to consumers + }else{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); + if(i == unacked.end()){ + throw InvalidAckException(); + }else if(multiple){ + ack_iterator end = ++i; + for_each(unacked.begin(), end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0)); + unacked.erase(unacked.begin(), end); + + //recalculate the prefetch: + outstanding.reset(); + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); + }else{ + i->discard(); + i->subtractFrom(&outstanding); + unacked.erase(i); + } + + //if the prefetch limit had previously been reached, there may + //be messages that can be now be delivered + for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ + j->second->requestDispatch(); + } + } +} + +void Channel::recover(bool requeue){ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + if(requeue){ + outstanding.reset(); + std::list<DeliveryRecord> copy = unacked; + unacked.clear(); + for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); + }else{ + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); + } +} + +bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ + Message::shared_ptr msg = queue->dequeue(); + if(msg){ + Mutex::ScopedLock locker(deliveryLock); + u_int64_t myDeliveryTag = currentDeliveryTag++; + msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + if(ackExpected){ + unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ + msg->deliver(out, id, consumerTag, deliveryTag, framesize); +} diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h new file mode 100644 index 0000000000..804d6866b1 --- /dev/null +++ b/cpp/lib/broker/BrokerChannel.h @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Channel_ +#define _Channel_ + +#include <algorithm> +#include <functional> +#include <list> +#include <map> +#include <AccumulatedAck.h> +#include <Binding.h> +#include <Consumer.h> +#include <DeletingTxOp.h> +#include <DeliverableMessage.h> +#include <DeliveryRecord.h> +#include <BrokerMessage.h> +#include <MessageBuilder.h> +#include <NameGenerator.h> +#include <Prefetch.h> +#include <BrokerQueue.h> +#include <TransactionalStore.h> +#include <TxAck.h> +#include <TxBuffer.h> +#include <TxPublish.h> +#include <sys/Monitor.h> +#include <OutputHandler.h> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include <BasicPublishBody.h> + +namespace qpid { + namespace broker { + using qpid::framing::string; + + /** + * Maintains state for an AMQP channel. Handles incoming and + * outgoing messages for that channel. + */ + class Channel : private MessageBuilder::CompletionHandler{ + class ConsumerImpl : public virtual Consumer{ + Channel* parent; + const string tag; + Queue::shared_ptr queue; + ConnectionToken* const connection; + const bool ackExpected; + bool blocked; + public: + ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); + virtual bool deliver(Message::shared_ptr& msg); + void cancel(); + void requestDispatch(); + }; + + typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + const int id; + qpid::framing::OutputHandler* out; + u_int64_t currentDeliveryTag; + Queue::shared_ptr defaultQueue; + bool transactional; + std::map<string, ConsumerImpl*> consumers; + u_int32_t prefetchSize; + u_int16_t prefetchCount; + Prefetch outstanding; + u_int32_t framesize; + NameGenerator tagGenerator; + std::list<DeliveryRecord> unacked; + qpid::sys::Mutex deliveryLock; + TxBuffer txBuffer; + AccumulatedAck accumulatedAck; + TransactionalStore* store; + MessageBuilder messageBuilder;//builder for in-progress message + Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to + + virtual void complete(Message::shared_ptr& msg); + void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); + void cancel(consumer_iterator consumer); + bool checkPrefetch(Message::shared_ptr& msg); + + public: + Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); + ~Channel(); + inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } + inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } + inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; } + inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; } + bool exists(const string& consumerTag); + void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); + void cancel(const string& tag); + bool get(Queue::shared_ptr queue, bool ackExpected); + void begin(); + void close(); + void commit(); + void rollback(); + void ack(u_int64_t deliveryTag, bool multiple); + void recover(bool requeue); + void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); + void handlePublish(Message* msg, Exchange::shared_ptr exchange); + void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void handleContent(qpid::framing::AMQContentBody::shared_ptr content); + }; + + struct InvalidAckException{}; + } +} + + +#endif diff --git a/cpp/lib/broker/BrokerExchange.h b/cpp/lib/broker/BrokerExchange.h new file mode 100644 index 0000000000..f5e4d9cb28 --- /dev/null +++ b/cpp/lib/broker/BrokerExchange.h @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Exchange_ +#define _Exchange_ + +#include <boost/shared_ptr.hpp> +#include <Deliverable.h> +#include <BrokerQueue.h> +#include <FieldTable.h> + +namespace qpid { + namespace broker { + using std::string; + + class Exchange{ + const string name; + public: + typedef boost::shared_ptr<Exchange> shared_ptr; + + explicit Exchange(const string& _name) : name(_name){} + virtual ~Exchange(){} + string getName() { return name; } + virtual string getType() = 0; + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + }; + } +} + + +#endif diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp new file mode 100644 index 0000000000..7fa1444a9e --- /dev/null +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerMessage.h> +#include <iostream> + +#include <InMemoryContent.h> +#include <LazyLoadedContent.h> +#include <MessageStore.h> +// AMQP version change - kpvdr 2006-11-17 +#include <ProtocolVersion.h> +#include <BasicDeliverBody.h> +#include <BasicGetOkBody.h> + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; + +Message::Message(const ConnectionToken* const _publisher, + const string& _exchange, const string& _routingKey, + bool _mandatory, bool _immediate) : publisher(_publisher), + exchange(_exchange), + routingKey(_routingKey), + mandatory(_mandatory), + immediate(_immediate), + redelivered(false), + size(0), + persistenceId(0) {} + +Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : + publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ + + decode(buffer, headersOnly, contentChunkSize); +} + +Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} + +Message::~Message(){} + +void Message::setHeader(AMQHeaderBody::shared_ptr _header){ + this->header = _header; +} + +void Message::addContent(AMQContentBody::shared_ptr data){ + if (!content.get()) { + content = std::auto_ptr<Content>(new InMemoryContent()); + } + content->add(data); + size += data->size(); +} + +bool Message::isComplete(){ + return header.get() && (header->getContentSize() == contentSize()); +} + +void Message::redeliver(){ + redelivered = true; +} + +void Message::deliver(OutputHandler* out, int channel, + const string& consumerTag, u_int64_t deliveryTag, + u_int32_t framesize){ + + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version + out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); + sendContent(out, channel, framesize); +} + +void Message::sendGetOk(OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize){ + + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version + out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount))); + sendContent(out, channel, framesize); +} + +void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ + AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); + out->send(new AMQFrame(channel, headerBody)); + + if (content.get()) content->send(out, channel, framesize); +} + +BasicHeaderProperties* Message::getHeaderProperties(){ + return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); +} + +const ConnectionToken* const Message::getPublisher(){ + return publisher; +} + +bool Message::isPersistent() +{ + if(!header) return false; + BasicHeaderProperties* props = getHeaderProperties(); + return props && props->getDeliveryMode() == PERSISTENT; +} + +void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) +{ + decodeHeader(buffer); + if (!headersOnly) decodeContent(buffer, contentChunkSize); +} + +void Message::decodeHeader(Buffer& buffer) +{ + buffer.getShortString(exchange); + buffer.getShortString(routingKey); + + u_int32_t headerSize = buffer.getLong(); + AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); + headerBody->decode(buffer, headerSize); + setHeader(headerBody); +} + +void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) +{ + u_int64_t expected = expectedContentSize(); + if (expected != buffer.available()) { + std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; + } + + if (!chunkSize || chunkSize > expected) { + chunkSize = expected; + } + + u_int64_t total = 0; + while (total < expectedContentSize()) { + u_int64_t remaining = expected - total; + AMQContentBody::shared_ptr contentBody(new AMQContentBody()); + contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); + addContent(contentBody); + total += chunkSize; + } +} + +void Message::encode(Buffer& buffer) +{ + encodeHeader(buffer); + encodeContent(buffer); +} + +void Message::encodeHeader(Buffer& buffer) +{ + buffer.putShortString(exchange); + buffer.putShortString(routingKey); + buffer.putLong(header->size()); + header->encode(buffer); +} + +void Message::encodeContent(Buffer& buffer) +{ + if (content.get()) content->encode(buffer); +} + +u_int32_t Message::encodedSize() +{ + return encodedHeaderSize() + encodedContentSize(); +} + +u_int32_t Message::encodedContentSize() +{ + return content.get() ? content->size() : 0; +} + +u_int32_t Message::encodedHeaderSize() +{ + return exchange.size() + 1 + + routingKey.size() + 1 + + header->size() + 4;//4 extra bytes for size +} + +u_int64_t Message::expectedContentSize() +{ + return header.get() ? header->getContentSize() : 0; +} + +void Message::releaseContent(MessageStore* store) +{ + if (!content.get() || content->size() > 0) { + //set content to lazy loading mode (but only if there is stored content): + + //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is + // then set as a member of that message so its lifetime is guaranteed to be no longer than + // that of the message itself + content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize())); + } +} + +void Message::setContent(std::auto_ptr<Content>& _content) +{ + content = _content; +} diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h new file mode 100644 index 0000000000..59e146959d --- /dev/null +++ b/cpp/lib/broker/BrokerMessage.h @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Message_ +#define _Message_ + +#include <memory> +#include <boost/shared_ptr.hpp> +#include <ConnectionToken.h> +#include <Content.h> +#include <TxBuffer.h> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include <BasicHeaderProperties.h> +#include <OutputHandler.h> + +namespace qpid { + namespace broker { + + class MessageStore; + using qpid::framing::string; + /** + * Represents an AMQP message, i.e. a header body, a list of + * content bodies and some details about the publication + * request. + */ + class Message{ + const ConnectionToken* const publisher; + string exchange; + string routingKey; + const bool mandatory; + const bool immediate; + bool redelivered; + qpid::framing::AMQHeaderBody::shared_ptr header; + std::auto_ptr<Content> content; + u_int64_t size; + u_int64_t persistenceId; + + void sendContent(qpid::framing::OutputHandler* out, + int channel, u_int32_t framesize); + + public: + typedef boost::shared_ptr<Message> shared_ptr; + + Message(const ConnectionToken* const publisher, + const string& exchange, const string& routingKey, + bool mandatory, bool immediate); + Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); + Message(); + ~Message(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + const ConnectionToken* const getPublisher(); + + void deliver(qpid::framing::OutputHandler* out, + int channel, + const string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize); + void redeliver(); + + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const string& getRoutingKey() const { return routingKey; } + const string& getExchange() const { return exchange; } + u_int64_t contentSize() const { return size; } + u_int64_t getPersistenceId() const { return persistenceId; } + void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } + + void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); + void decodeHeader(qpid::framing::Buffer& buffer); + void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0); + + void encode(qpid::framing::Buffer& buffer); + void encodeHeader(qpid::framing::Buffer& buffer); + void encodeContent(qpid::framing::Buffer& buffer); + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + */ + u_int32_t encodedSize(); + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + */ + u_int32_t encodedHeaderSize(); + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + u_int32_t encodedContentSize(); + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + void releaseContent(MessageStore* store); + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + u_int64_t expectedContentSize(); + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + void setContent(std::auto_ptr<Content>& content); + }; + + } +} + + +#endif diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp new file mode 100644 index 0000000000..051b9d1e0f --- /dev/null +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -0,0 +1,202 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerQueue.h> +#include <MessageStore.h> +#include <sys/Monitor.h> +#include <sys/Time.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::sys; + +Queue::Queue(const string& _name, u_int32_t _autodelete, + MessageStore* const _store, + const ConnectionToken* const _owner) : + + name(_name), + autodelete(_autodelete), + store(_store), + owner(_owner), + queueing(false), + dispatching(false), + next(0), + lastUsed(0), + exclusive(0), + persistenceId(0) +{ + if(autodelete) lastUsed = now()/TIME_MSEC; +} + +Queue::~Queue(){ + for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){ + b->cancel(); + bindings.pop(); + } +} + +void Queue::bound(Binding* b){ + bindings.push(b); +} + +void Queue::deliver(Message::shared_ptr& msg){ + enqueue(0, msg, 0); + process(msg); +} + +void Queue::recover(Message::shared_ptr& msg){ + queueing = true; + messages.push(msg); +} + +void Queue::process(Message::shared_ptr& msg){ + Mutex::ScopedLock locker(lock); + if(queueing || !dispatch(msg)){ + queueing = true; + messages.push(msg); + } +} + +bool Queue::dispatch(Message::shared_ptr& msg){ + if(consumers.empty()){ + return false; + }else if(exclusive){ + if(!exclusive->deliver(msg)){ + std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; + } + return true; + }else{ + //deliver to next consumer + next = next % consumers.size(); + Consumer* c = consumers[next]; + int start = next; + while(c){ + next++; + if(c->deliver(msg)) return true; + + next = next % consumers.size(); + c = next == start ? 0 : consumers[next]; + } + return false; + } +} + +bool Queue::startDispatching(){ + Mutex::ScopedLock locker(lock); + if(queueing && !dispatching){ + dispatching = true; + return true; + }else{ + return false; + } +} + +void Queue::dispatch(){ + bool proceed = startDispatching(); + while(proceed){ + Mutex::ScopedLock locker(lock); + if(!messages.empty() && dispatch(messages.front())){ + messages.pop(); + }else{ + dispatching = false; + proceed = false; + queueing = !messages.empty(); + } + } +} + +void Queue::consume(Consumer* c, bool requestExclusive){ + Mutex::ScopedLock locker(lock); + if(exclusive) throw ExclusiveAccessException(); + if(requestExclusive){ + if(!consumers.empty()) throw ExclusiveAccessException(); + exclusive = c; + } + + if(autodelete && consumers.empty()) lastUsed = 0; + consumers.push_back(c); +} + +void Queue::cancel(Consumer* c){ + Mutex::ScopedLock locker(lock); + consumers.erase(find(consumers.begin(), consumers.end(), c)); + if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC; + if(exclusive == c) exclusive = 0; +} + +Message::shared_ptr Queue::dequeue(){ + Mutex::ScopedLock locker(lock); + Message::shared_ptr msg; + if(!messages.empty()){ + msg = messages.front(); + messages.pop(); + } + return msg; +} + +u_int32_t Queue::purge(){ + Mutex::ScopedLock locker(lock); + int count = messages.size(); + while(!messages.empty()) messages.pop(); + return count; +} + +u_int32_t Queue::getMessageCount() const{ + Mutex::ScopedLock locker(lock); + return messages.size(); +} + +u_int32_t Queue::getConsumerCount() const{ + Mutex::ScopedLock locker(lock); + return consumers.size(); +} + +bool Queue::canAutoDelete() const{ + Mutex::ScopedLock locker(lock); + return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete); +} + +void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) +{ + if (msg->isPersistent() && store) { + store->enqueue(ctxt, msg, *this, xid); + } +} + +void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) +{ + if (msg->isPersistent() && store) { + store->dequeue(ctxt, msg, *this, xid); + } +} + +void Queue::create() +{ + if (store) { + store->create(*this); + } +} + +void Queue::destroy() +{ + if (store) { + store->destroy(*this); + } +} diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h new file mode 100644 index 0000000000..13f4bf2de0 --- /dev/null +++ b/cpp/lib/broker/BrokerQueue.h @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Queue_ +#define _Queue_ + +#include <vector> +#include <queue> +#include <boost/shared_ptr.hpp> +#include <amqp_types.h> +#include <Binding.h> +#include <ConnectionToken.h> +#include <Consumer.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> + +namespace qpid { + namespace broker { + class MessageStore; + + /** + * Thrown when exclusive access would be violated. + */ + struct ExclusiveAccessException{}; + + using std::string; + /** + * The brokers representation of an amqp queue. Messages are + * delivered to a queue from where they can be dispatched to + * registered consumers or be stored until dequeued or until one + * or more consumers registers. + */ + class Queue{ + const string name; + const u_int32_t autodelete; + MessageStore* const store; + const ConnectionToken* const owner; + std::vector<Consumer*> consumers; + std::queue<Binding*> bindings; + std::queue<Message::shared_ptr> messages; + bool queueing; + bool dispatching; + int next; + mutable qpid::sys::Mutex lock; + int64_t lastUsed; + Consumer* exclusive; + mutable u_int64_t persistenceId; + + bool startDispatching(); + bool dispatch(Message::shared_ptr& msg); + + public: + + typedef boost::shared_ptr<Queue> shared_ptr; + + typedef std::vector<shared_ptr> vector; + + Queue(const string& name, u_int32_t autodelete = 0, + MessageStore* const store = 0, + const ConnectionToken* const owner = 0); + ~Queue(); + + void create(); + void destroy(); + /** + * Informs the queue of a binding that should be cancelled on + * destruction of the queue. + */ + void bound(Binding* b); + /** + * Delivers a message to the queue. Will record it as + * enqueued if persistent then process it. + */ + void deliver(Message::shared_ptr& msg); + /** + * Dispatches the messages immediately to a consumer if + * one is available or stores it for later if not. + */ + void process(Message::shared_ptr& msg); + /** + * Used during recovery to add stored messages back to the queue + */ + void recover(Message::shared_ptr& msg); + /** + * Dispatch any queued messages providing there are + * consumers for them. Only one thread can be dispatching + * at any time, but this method (rather than the caller) + * is responsible for ensuring that. + */ + void dispatch(); + void consume(Consumer* c, bool exclusive = false); + void cancel(Consumer* c); + u_int32_t purge(); + u_int32_t getMessageCount() const; + u_int32_t getConsumerCount() const; + inline const string& getName() const { return name; } + inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } + inline bool hasExclusiveConsumer() const { return exclusive; } + inline u_int64_t getPersistenceId() const { return persistenceId; } + inline void setPersistenceId(u_int64_t _persistenceId) const { persistenceId = _persistenceId; } + + bool canAutoDelete() const; + + void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); + /** + * dequeue from store (only done once messages is acknowledged) + */ + void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); + /** + * dequeues from memory only + */ + Message::shared_ptr dequeue(); + }; + } +} + + +#endif diff --git a/cpp/lib/broker/Configuration.cpp b/cpp/lib/broker/Configuration.cpp new file mode 100644 index 0000000000..978900fce5 --- /dev/null +++ b/cpp/lib/broker/Configuration.cpp @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <Configuration.h> +#include <string.h> + +using namespace qpid::broker; +using namespace std; + +Configuration::Configuration() : + trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false), + port('p', "port", "Sets the port to listen on (default=5672)", 5672), + workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5), + maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500), + connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), + store('s', "store", "Sets the message store module to use (default='' which implies no store)", ""), + help("help", "Prints usage information", false) +{ + options.push_back(&trace); + options.push_back(&port); + options.push_back(&workerThreads); + options.push_back(&maxConnections); + options.push_back(&connectionBacklog); + options.push_back(&store); + options.push_back(&help); +} + +Configuration::~Configuration(){} + +void Configuration::parse(int argc, char** argv){ + int position = 1; + while(position < argc){ + bool matched(false); + for(op_iterator i = options.begin(); i < options.end() && !matched; i++){ + matched = (*i)->parse(position, argv, argc); + } + if(!matched){ + std::cout<< "Warning: skipping unrecognised option " << argv[position] << std::endl; + position++; + } + } +} + +void Configuration::usage(){ + for(op_iterator i = options.begin(); i < options.end(); i++){ + (*i)->print(std::cout); + } +} + +bool Configuration::isHelp() const { + return help.getValue(); +} + +bool Configuration::isTrace() const { + return trace.getValue(); +} + +int Configuration::getPort() const { + return port.getValue(); +} + +int Configuration::getWorkerThreads() const { + return workerThreads.getValue(); +} + +int Configuration::getMaxConnections() const { + return maxConnections.getValue(); +} + +int Configuration::getConnectionBacklog() const { + return connectionBacklog.getValue(); +} + +const std::string& Configuration::getStore() const { + return store.getValue(); +} + +Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : + flag(string("-") + _flag), name("--" +_name), desc(_desc) {} + +Configuration::Option::Option(const string& _name, const string& _desc) : + flag(""), name("--" + _name), desc(_desc) {} + +Configuration::Option::~Option(){} + +bool Configuration::Option::match(const string& arg){ + return flag == arg || name == arg; +} + +bool Configuration::Option::parse(int& i, char** argv, int argc){ + const string arg(argv[i]); + if(match(arg)){ + if(needsValue()){ + if(++i < argc) setValue(argv[i]); + else throw ParseException("Argument " + arg + " requires a value!"); + }else{ + setValue(""); + } + i++; + return true; + }else{ + return false; + } +} + +void Configuration::Option::print(ostream& out) const { + out << " "; + if(flag.length() > 0){ + out << flag << " or "; + } + out << name; + if(needsValue()) out << "<value>"; + out << std::endl; + out << " " << desc << std::endl; +} + + +// String Option: + +Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) : + Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) : + Option(_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::StringOption::~StringOption(){} + +const string& Configuration::StringOption::getValue() const { + return value; +} + +bool Configuration::StringOption::needsValue() const { + return true; +} + +void Configuration::StringOption::setValue(const std::string& _value){ + value = _value; +} + +// Int Option: + +Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) : + Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) : + Option(_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::IntOption::~IntOption(){} + +int Configuration::IntOption::getValue() const { + return value; +} + +bool Configuration::IntOption::needsValue() const { + return true; +} + +void Configuration::IntOption::setValue(const std::string& _value){ + value = atoi(_value.c_str()); +} + +// Bool Option: + +Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) : + Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) : + Option(_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::BoolOption::~BoolOption(){} + +bool Configuration::BoolOption::getValue() const { + return value; +} + +bool Configuration::BoolOption::needsValue() const { + return false; +} + +void Configuration::BoolOption::setValue(const std::string& /*not required*/){ + //BoolOptions have no value. The fact that the option is specified + //implies the value is true. + value = true; +} diff --git a/cpp/lib/broker/Configuration.h b/cpp/lib/broker/Configuration.h new file mode 100644 index 0000000000..1dfc191018 --- /dev/null +++ b/cpp/lib/broker/Configuration.h @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Configuration_ +#define _Configuration_ + +#include <cstdlib> +#include <iostream> +#include <vector> +#include <Exception.h> + +namespace qpid { + namespace broker { + class Configuration{ + class Option { + const std::string flag; + const std::string name; + const std::string desc; + + bool match(const std::string& arg); + + protected: + virtual bool needsValue() const = 0; + virtual void setValue(const std::string& value) = 0; + + public: + Option(const char flag, const std::string& name, const std::string& desc); + Option(const std::string& name, const std::string& desc); + virtual ~Option(); + + bool parse(int& i, char** argv, int argc); + void print(std::ostream& out) const; + }; + + class IntOption : public Option{ + const int defaultValue; + int value; + public: + IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0); + IntOption(const std::string& name, const std::string& desc, const int value = 0); + virtual ~IntOption(); + + int getValue() const; + virtual bool needsValue() const; + virtual void setValue(const std::string& value); + virtual void setValue(int _value) { value = _value; } + }; + + class StringOption : public Option{ + const std::string defaultValue; + std::string value; + public: + StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = ""); + StringOption(const std::string& name, const std::string& desc, const std::string value = ""); + virtual ~StringOption(); + + const std::string& getValue() const; + virtual bool needsValue() const; + virtual void setValue(const std::string& value); + }; + + class BoolOption : public Option{ + const bool defaultValue; + bool value; + public: + BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0); + BoolOption(const std::string& name, const std::string& desc, const bool value = 0); + virtual ~BoolOption(); + + bool getValue() const; + virtual bool needsValue() const; + virtual void setValue(const std::string& value); + virtual void setValue(bool _value) { value = _value; } + }; + + BoolOption trace; + IntOption port; + IntOption workerThreads; + IntOption maxConnections; + IntOption connectionBacklog; + StringOption store; + BoolOption help; + + typedef std::vector<Option*>::iterator op_iterator; + std::vector<Option*> options; + + public: + class ParseException : public Exception { + public: + ParseException(const std::string& msg) : Exception(msg) {} + }; + + + Configuration(); + ~Configuration(); + + void parse(int argc, char** argv); + + bool isHelp() const; + bool isTrace() const; + int getPort() const; + int getWorkerThreads() const; + int getMaxConnections() const; + int getConnectionBacklog() const; + const std::string& getStore() const; + + void setHelp(bool b) { help.setValue(b); } + void setTrace(bool b) { trace.setValue(b); } + void setPort(int i) { port.setValue(i); } + void setWorkerThreads(int i) { workerThreads.setValue(i); } + void setMaxConnections(int i) { maxConnections.setValue(i); } + void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } + void setStore(const std::string& s) { store.setValue(s); } + + void usage(); + }; + } +} + + +#endif diff --git a/cpp/lib/broker/ConnectionToken.h b/cpp/lib/broker/ConnectionToken.h new file mode 100644 index 0000000000..7e7f813d0e --- /dev/null +++ b/cpp/lib/broker/ConnectionToken.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConnectionToken_ +#define _ConnectionToken_ + +namespace qpid { + namespace broker { + /** + * An empty interface allowing opaque implementations of some + * form of token to identify a connection. + */ + class ConnectionToken{ + public: + virtual ~ConnectionToken(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/Consumer.h b/cpp/lib/broker/Consumer.h new file mode 100644 index 0000000000..26deef4a26 --- /dev/null +++ b/cpp/lib/broker/Consumer.h @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Consumer_ +#define _Consumer_ + +#include <BrokerMessage.h> + +namespace qpid { + namespace broker { + class Consumer{ + public: + virtual bool deliver(Message::shared_ptr& msg) = 0; + virtual ~Consumer(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/Content.h b/cpp/lib/broker/Content.h new file mode 100644 index 0000000000..ed425c6735 --- /dev/null +++ b/cpp/lib/broker/Content.h @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Content_ +#define _Content_ + +#include <AMQContentBody.h> +#include <Buffer.h> +#include <OutputHandler.h> + +namespace qpid { + namespace broker { + class Content{ + public: + virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0; + virtual u_int32_t size() = 0; + virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0; + virtual void encode(qpid::framing::Buffer& buffer) = 0; + virtual ~Content(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/DeletingTxOp.cpp b/cpp/lib/broker/DeletingTxOp.cpp new file mode 100644 index 0000000000..25fe9c98db --- /dev/null +++ b/cpp/lib/broker/DeletingTxOp.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <DeletingTxOp.h> + +using namespace qpid::broker; + +DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){} + +bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){ + return delegate && delegate->prepare(ctxt); +} + +void DeletingTxOp::commit() throw(){ + if(delegate){ + delegate->commit(); + delete delegate; + delegate = 0; + } +} + +void DeletingTxOp::rollback() throw(){ + if(delegate){ + delegate->rollback(); + delete delegate; + delegate = 0; + } +} diff --git a/cpp/lib/broker/DeletingTxOp.h b/cpp/lib/broker/DeletingTxOp.h new file mode 100644 index 0000000000..3e026cd4ca --- /dev/null +++ b/cpp/lib/broker/DeletingTxOp.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeletingTxOp_ +#define _DeletingTxOp_ + +#include <TxOp.h> + +namespace qpid { + namespace broker { + /** + * TxOp wrapper that will delegate calls & delete the object + * to which it delegates after completion of the transaction. + */ + class DeletingTxOp : public virtual TxOp{ + TxOp* delegate; + public: + DeletingTxOp(TxOp* const delegate); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~DeletingTxOp(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/Deliverable.h b/cpp/lib/broker/Deliverable.h new file mode 100644 index 0000000000..e33443555d --- /dev/null +++ b/cpp/lib/broker/Deliverable.h @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Deliverable_ +#define _Deliverable_ + +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + class Deliverable{ + public: + virtual void deliverTo(Queue::shared_ptr& queue) = 0; + virtual ~Deliverable(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/DeliverableMessage.cpp b/cpp/lib/broker/DeliverableMessage.cpp new file mode 100644 index 0000000000..b9c89da690 --- /dev/null +++ b/cpp/lib/broker/DeliverableMessage.cpp @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <DeliverableMessage.h> + +using namespace qpid::broker; + +DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg) +{ +} + +void DeliverableMessage::deliverTo(Queue::shared_ptr& queue) +{ + queue->deliver(msg); +} + diff --git a/cpp/lib/broker/DeliverableMessage.h b/cpp/lib/broker/DeliverableMessage.h new file mode 100644 index 0000000000..962f0da640 --- /dev/null +++ b/cpp/lib/broker/DeliverableMessage.h @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeliverableMessage_ +#define _DeliverableMessage_ + +#include <Deliverable.h> +#include <BrokerMessage.h> +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + class DeliverableMessage : public Deliverable{ + Message::shared_ptr msg; + public: + DeliverableMessage(Message::shared_ptr& msg); + virtual void deliverTo(Queue::shared_ptr& queue); + virtual ~DeliverableMessage(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/DeliveryRecord.cpp b/cpp/lib/broker/DeliveryRecord.cpp new file mode 100644 index 0000000000..9d02cb615e --- /dev/null +++ b/cpp/lib/broker/DeliveryRecord.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <DeliveryRecord.h> +#include <BrokerChannel.h> + +using namespace qpid::broker; +using std::string; + +DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const string _consumerTag, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + deliveryTag(_deliveryTag), + pull(false){} + +DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(""), + deliveryTag(_deliveryTag), + pull(true){} + + +void DeliveryRecord::discard(TransactionContext* ctxt) const{ + queue->dequeue(ctxt, msg, 0); +} + +bool DeliveryRecord::matches(u_int64_t tag) const{ + return deliveryTag == tag; +} + +bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ + return range->covers(deliveryTag); +} + +void DeliveryRecord::redeliver(Channel* const channel) const{ + if(pull){ + //if message was originally sent as response to get, we must requeue it + requeue(); + }else{ + channel->deliver(msg, consumerTag, deliveryTag); + } +} + +void DeliveryRecord::requeue() const{ + msg->redeliver(); + queue->process(msg); +} + +void DeliveryRecord::addTo(Prefetch* const prefetch) const{ + if(!pull){ + //ignore 'pulled' messages (i.e. those that were sent in + //response to get) when calculating prefetch + prefetch->size += msg->contentSize(); + prefetch->count++; + } +} + +void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{ + if(!pull){ + //ignore 'pulled' messages (i.e. those that were sent in + //response to get) when calculating prefetch + prefetch->size -= msg->contentSize(); + prefetch->count--; + } +} diff --git a/cpp/lib/broker/DeliveryRecord.h b/cpp/lib/broker/DeliveryRecord.h new file mode 100644 index 0000000000..c1c8d6d13c --- /dev/null +++ b/cpp/lib/broker/DeliveryRecord.h @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeliveryRecord_ +#define _DeliveryRecord_ + +#include <algorithm> +#include <list> +#include <AccumulatedAck.h> +#include <BrokerMessage.h> +#include <Prefetch.h> +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + class Channel; + + /** + * Record of a delivery for which an ack is outstanding. + */ + class DeliveryRecord{ + mutable Message::shared_ptr msg; + mutable Queue::shared_ptr queue; + std::string consumerTag; + u_int64_t deliveryTag; + bool pull; + + public: + DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const u_int64_t deliveryTag); + DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag); + + void discard(TransactionContext* ctxt = 0) const; + bool matches(u_int64_t tag) const; + bool coveredBy(const AccumulatedAck* const range) const; + void requeue() const; + void redeliver(Channel* const) const; + void addTo(Prefetch* const prefetch) const; + void subtractFrom(Prefetch* const prefetch) const; + }; + + typedef std::list<DeliveryRecord>::iterator ack_iterator; + } +} + + +#endif diff --git a/cpp/lib/broker/DirectExchange.cpp b/cpp/lib/broker/DirectExchange.cpp new file mode 100644 index 0000000000..c898ae8d7e --- /dev/null +++ b/cpp/lib/broker/DirectExchange.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <DirectExchange.h> +#include <ExchangeBinding.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { + +} + +void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ + Mutex::ScopedLock l(lock); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + if(i == queues.end()){ + bindings[routingKey].push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); + } +} + +void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ + Mutex::ScopedLock l(lock); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + + std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); + if(i < queues.end()){ + queues.erase(i); + if(queues.empty()){ + bindings.erase(routingKey); + } + } +} + +void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ + Mutex::ScopedLock l(lock); + std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); + int count(0); + for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ + msg.deliverTo(*i); + } + if(!count){ + std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; + } +} + +DirectExchange::~DirectExchange(){ + +} + + +const std::string DirectExchange::typeName("direct"); diff --git a/cpp/lib/broker/DirectExchange.h b/cpp/lib/broker/DirectExchange.h new file mode 100644 index 0000000000..a7ef5aca9e --- /dev/null +++ b/cpp/lib/broker/DirectExchange.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DirectExchange_ +#define _DirectExchange_ + +#include <map> +#include <vector> +#include <BrokerExchange.h> +#include <FieldTable.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + class DirectExchange : public virtual Exchange{ + std::map<string, std::vector<Queue::shared_ptr> > bindings; + qpid::sys::Mutex lock; + + public: + static const std::string typeName; + + DirectExchange(const std::string& name); + + virtual std::string getType(){ return typeName; } + + virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual ~DirectExchange(); + }; +} +} + + +#endif diff --git a/cpp/lib/broker/ExchangeBinding.cpp b/cpp/lib/broker/ExchangeBinding.cpp new file mode 100644 index 0000000000..bf2102414d --- /dev/null +++ b/cpp/lib/broker/ExchangeBinding.cpp @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <ExchangeBinding.h> +#include <BrokerExchange.h> + +using namespace qpid::broker; +using namespace qpid::framing; + +ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){} + +void ExchangeBinding::cancel(){ + e->unbind(q, key, args); + delete this; +} + +ExchangeBinding::~ExchangeBinding(){ +} diff --git a/cpp/lib/broker/ExchangeBinding.h b/cpp/lib/broker/ExchangeBinding.h new file mode 100644 index 0000000000..2afaa89552 --- /dev/null +++ b/cpp/lib/broker/ExchangeBinding.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ExchangeBinding_ +#define _ExchangeBinding_ + +#include <Binding.h> +#include <FieldTable.h> +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + class Exchange; + class Queue; + + class ExchangeBinding : public virtual Binding{ + Exchange* e; + Queue::shared_ptr q; + const string key; + const qpid::framing::FieldTable* args; + public: + ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const qpid::framing::FieldTable* _args); + virtual void cancel(); + virtual ~ExchangeBinding(); + }; + } +} + + +#endif + diff --git a/cpp/lib/broker/ExchangeRegistry.cpp b/cpp/lib/broker/ExchangeRegistry.cpp new file mode 100644 index 0000000000..7bf96c4544 --- /dev/null +++ b/cpp/lib/broker/ExchangeRegistry.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <ExchangeRegistry.h> +#include <DirectExchange.h> +#include <FanOutExchange.h> +#include <HeadersExchange.h> +#include <TopicExchange.h> + +using namespace qpid::broker; +using namespace qpid::sys; +using std::pair; + +pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){ + Mutex::ScopedLock locker(lock); + ExchangeMap::iterator i = exchanges.find(name); + if (i == exchanges.end()) { + Exchange::shared_ptr exchange; + + if(type == TopicExchange::typeName){ + exchange = Exchange::shared_ptr(new TopicExchange(name)); + }else if(type == DirectExchange::typeName){ + exchange = Exchange::shared_ptr(new DirectExchange(name)); + }else if(type == FanOutExchange::typeName){ + exchange = Exchange::shared_ptr(new FanOutExchange(name)); + }else if (type == HeadersExchange::typeName) { + exchange = Exchange::shared_ptr(new HeadersExchange(name)); + }else{ + throw UnknownExchangeTypeException(); + } + exchanges[name] = exchange; + return std::pair<Exchange::shared_ptr, bool>(exchange, true); + } else { + return std::pair<Exchange::shared_ptr, bool>(i->second, false); + } +} + +void ExchangeRegistry::destroy(const string& name){ + Mutex::ScopedLock locker(lock); + exchanges.erase(name); +} + +Exchange::shared_ptr ExchangeRegistry::get(const string& name){ + Mutex::ScopedLock locker(lock); + return exchanges[name]; +} + +namespace +{ +const std::string empty; +} + +Exchange::shared_ptr ExchangeRegistry::getDefault() +{ + return get(empty); +} diff --git a/cpp/lib/broker/ExchangeRegistry.h b/cpp/lib/broker/ExchangeRegistry.h new file mode 100644 index 0000000000..8dcd0d3623 --- /dev/null +++ b/cpp/lib/broker/ExchangeRegistry.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ExchangeRegistry_ +#define _ExchangeRegistry_ + +#include <map> +#include <BrokerExchange.h> +#include <sys/Monitor.h> + +namespace qpid { +namespace broker { + struct UnknownExchangeTypeException{}; + + class ExchangeRegistry{ + typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap; + ExchangeMap exchanges; + qpid::sys::Mutex lock; + public: + std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException); + void destroy(const std::string& name); + Exchange::shared_ptr get(const std::string& name); + Exchange::shared_ptr getDefault(); + }; +} +} + + +#endif diff --git a/cpp/lib/broker/FanOutExchange.cpp b/cpp/lib/broker/FanOutExchange.cpp new file mode 100644 index 0000000000..48afcc20d5 --- /dev/null +++ b/cpp/lib/broker/FanOutExchange.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <FanOutExchange.h> +#include <ExchangeBinding.h> +#include <algorithm> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} + +void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ + Mutex::ScopedLock locker(lock); + // Add if not already present. + Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + if (i == bindings.end()) { + bindings.push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); + } +} + +void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ + Mutex::ScopedLock locker(lock); + Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); + if (i != bindings.end()) { + bindings.erase(i); + // TODO aconway 2006-09-14: What about the ExchangeBinding object? + // Don't we have to verify routingKey/args match? + } +} + +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ + Mutex::ScopedLock locker(lock); + for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ + msg.deliverTo(*i); + } +} + +FanOutExchange::~FanOutExchange() {} + +const std::string FanOutExchange::typeName("fanout"); diff --git a/cpp/lib/broker/FanOutExchange.h b/cpp/lib/broker/FanOutExchange.h new file mode 100644 index 0000000000..6dc70e69bb --- /dev/null +++ b/cpp/lib/broker/FanOutExchange.h @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _FanOutExchange_ +#define _FanOutExchange_ + +#include <map> +#include <vector> +#include <BrokerExchange.h> +#include <FieldTable.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + +class FanOutExchange : public virtual Exchange { + std::vector<Queue::shared_ptr> bindings; + qpid::sys::Mutex lock; + + public: + static const std::string typeName; + + FanOutExchange(const std::string& name); + + virtual std::string getType(){ return typeName; } + + virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + + virtual ~FanOutExchange(); +}; + +} +} + + + +#endif diff --git a/cpp/lib/broker/HeadersExchange.cpp b/cpp/lib/broker/HeadersExchange.cpp new file mode 100644 index 0000000000..acd344725a --- /dev/null +++ b/cpp/lib/broker/HeadersExchange.cpp @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <HeadersExchange.h> +#include <ExchangeBinding.h> +#include <Value.h> +#include <QpidError.h> +#include <algorithm> + + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +// TODO aconway 2006-09-20: More efficient matching algorithm. +// The current search algorithm really sucks. +// Fieldtables are heavy, maybe use shared_ptr to do handle-body. + +using namespace qpid::broker; + +namespace { + const std::string all("all"); + const std::string any("any"); + const std::string x_match("x-match"); +} + +HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } + +void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ + Mutex::ScopedLock locker(lock); + std::string what = args->getString("x-match"); + if (what != all && what != any) { + THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); + } + bindings.push_back(Binding(*args, queue)); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); +} + +void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ + Mutex::ScopedLock locker(lock); + Bindings::iterator i = + std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); + if (i != bindings.end()) bindings.erase(i); +} + + +void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ + Mutex::ScopedLock locker(lock);; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, *args)) msg.deliverTo(i->second); + } +} + +HeadersExchange::~HeadersExchange() {} + +const std::string HeadersExchange::typeName("headers"); + +namespace +{ + + bool match_values(const Value& bind, const Value& msg) { + return dynamic_cast<const EmptyValue*>(&bind) || bind == msg; + } + +} + + +bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { + typedef FieldTable::ValueMap Map; + std::string what = bind.getString(x_match); + if (what == all) { + for (Map::const_iterator i = bind.getMap().begin(); + i != bind.getMap().end(); + ++i) + { + if (i->first != x_match) + { + Map::const_iterator j = msg.getMap().find(i->first); + if (j == msg.getMap().end()) return false; + if (!match_values(*(i->second), *(j->second))) return false; + } + } + return true; + } else if (what == any) { + for (Map::const_iterator i = bind.getMap().begin(); + i != bind.getMap().end(); + ++i) + { + if (i->first != x_match) + { + Map::const_iterator j = msg.getMap().find(i->first); + if (j != msg.getMap().end()) { + if (match_values(*(i->second), *(j->second))) return true; + } + } + } + return false; + } else { + return false; + } +} + + + diff --git a/cpp/lib/broker/HeadersExchange.h b/cpp/lib/broker/HeadersExchange.h new file mode 100644 index 0000000000..5e8da5ad85 --- /dev/null +++ b/cpp/lib/broker/HeadersExchange.h @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _HeadersExchange_ +#define _HeadersExchange_ + +#include <vector> +#include <BrokerExchange.h> +#include <FieldTable.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + + +class HeadersExchange : public virtual Exchange { + typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding; + typedef std::vector<Binding> Bindings; + + Bindings bindings; + qpid::sys::Mutex lock; + + public: + static const std::string typeName; + + HeadersExchange(const string& name); + + virtual std::string getType(){ return typeName; } + + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual ~HeadersExchange(); + + static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs); +}; + + + +} +} + +#endif diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp new file mode 100644 index 0000000000..9d40877c86 --- /dev/null +++ b/cpp/lib/broker/InMemoryContent.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <InMemoryContent.h> + +using namespace qpid::broker; +using namespace qpid::framing; +using boost::static_pointer_cast; + +void InMemoryContent::add(AMQContentBody::shared_ptr data) +{ + content.push_back(data); +} + +u_int32_t InMemoryContent::size() +{ + int sum(0); + for (content_iterator i = content.begin(); i != content.end(); i++) { + sum += (*i)->size() + 8;//8 extra bytes for the frame + //TODO: have to get rid of the frame stuff from encoded data + } + return sum; +} + +void InMemoryContent::send(OutputHandler* out, int channel, u_int32_t framesize) +{ + for (content_iterator i = content.begin(); i != content.end(); i++) { + if ((*i)->size() > framesize) { + u_int32_t offset = 0; + for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { + string data = (*i)->getData().substr(offset, framesize); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + offset += framesize; + } + u_int32_t remainder = (*i)->size() % framesize; + if (remainder) { + string data = (*i)->getData().substr(offset, remainder); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } + } else { + AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); + out->send(new AMQFrame(channel, contentBody)); + } + } +} + +void InMemoryContent::encode(Buffer& buffer) +{ + for (content_iterator i = content.begin(); i != content.end(); i++) { + (*i)->encode(buffer); + } +} diff --git a/cpp/lib/broker/InMemoryContent.h b/cpp/lib/broker/InMemoryContent.h new file mode 100644 index 0000000000..c54d15447d --- /dev/null +++ b/cpp/lib/broker/InMemoryContent.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _InMemoryContent_ +#define _InMemoryContent_ + +#include <Content.h> +#include <vector> + +namespace qpid { + namespace broker { + class InMemoryContent : public Content{ + typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; + typedef content_list::iterator content_iterator; + + content_list content; + public: + void add(qpid::framing::AMQContentBody::shared_ptr data); + u_int32_t size(); + void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void encode(qpid::framing::Buffer& buffer); + ~InMemoryContent(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp new file mode 100644 index 0000000000..c0da48efda --- /dev/null +++ b/cpp/lib/broker/LazyLoadedContent.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <LazyLoadedContent.h> + +using namespace qpid::broker; +using namespace qpid::framing; + +LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, u_int64_t _expectedSize) : + store(_store), msg(_msg), expectedSize(_expectedSize) {} + +void LazyLoadedContent::add(AMQContentBody::shared_ptr data) +{ + store->appendContent(msg, data->getData()); +} + +u_int32_t LazyLoadedContent::size() +{ + return 0;//all content is written as soon as it is added +} + +void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesize) +{ + if (expectedSize > framesize) { + for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) { + u_int64_t remaining = expectedSize - offset; + string data; + store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } + } else { + string data; + store->loadContent(msg, data, 0, expectedSize); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } +} + +void LazyLoadedContent::encode(Buffer&) +{ + //do nothing as all content is written as soon as it is added +} diff --git a/cpp/lib/broker/LazyLoadedContent.h b/cpp/lib/broker/LazyLoadedContent.h new file mode 100644 index 0000000000..fdb752f117 --- /dev/null +++ b/cpp/lib/broker/LazyLoadedContent.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _LazyLoadedContent_ +#define _LazyLoadedContent_ + +#include <Content.h> +#include <MessageStore.h> + +namespace qpid { + namespace broker { + class LazyLoadedContent : public Content{ + MessageStore* const store; + Message* const msg; + const u_int64_t expectedSize; + public: + LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize); + void add(qpid::framing::AMQContentBody::shared_ptr data); + u_int32_t size(); + void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void encode(qpid::framing::Buffer& buffer); + ~LazyLoadedContent(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am new file mode 100644 index 0000000000..e99318087f --- /dev/null +++ b/cpp/lib/broker/Makefile.am @@ -0,0 +1,78 @@ +AM_CXXFLAGS = $(WARNING_CFLAGS) +INCLUDES = \ + -I$(top_srcdir)/gen \ + -I$(top_srcdir)/lib/common \ + -I$(top_srcdir)/lib/common/sys \ + -I$(top_srcdir)/lib/common/framing + +lib_LTLIBRARIES = libbroker.la +libbroker_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) +libbroker_la_SOURCES = \ + AccumulatedAck.cpp \ + AccumulatedAck.h \ + AutoDelete.cpp \ + AutoDelete.h \ + Binding.h \ + Broker.cpp \ + Broker.h \ + BrokerChannel.cpp \ + BrokerChannel.h \ + BrokerExchange.h \ + BrokerMessage.cpp \ + BrokerMessage.h \ + BrokerQueue.cpp \ + BrokerQueue.h \ + Configuration.cpp \ + Configuration.h \ + ConnectionToken.h \ + Consumer.h \ + Content.h \ + DeletingTxOp.cpp \ + DeletingTxOp.h \ + Deliverable.h \ + DeliverableMessage.cpp \ + DeliverableMessage.h \ + DeliveryRecord.cpp \ + DeliveryRecord.h \ + DirectExchange.cpp \ + DirectExchange.h \ + ExchangeBinding.cpp \ + ExchangeBinding.h \ + ExchangeRegistry.cpp \ + ExchangeRegistry.h \ + FanOutExchange.cpp \ + FanOutExchange.h \ + HeadersExchange.cpp \ + HeadersExchange.h \ + InMemoryContent.cpp \ + InMemoryContent.h \ + LazyLoadedContent.cpp \ + LazyLoadedContent.h \ + MessageBuilder.cpp \ + MessageBuilder.h \ + MessageStore.h \ + MessageStoreModule.cpp \ + MessageStoreModule.h \ + NameGenerator.cpp \ + NameGenerator.h \ + NullMessageStore.cpp \ + NullMessageStore.h \ + Prefetch.h \ + QueueRegistry.cpp \ + QueueRegistry.h \ + RecoveryManager.cpp \ + RecoveryManager.h \ + SessionHandlerFactoryImpl.cpp \ + SessionHandlerFactoryImpl.h \ + SessionHandlerImpl.cpp \ + SessionHandlerImpl.h \ + TopicExchange.cpp \ + TopicExchange.h \ + TransactionalStore.h \ + TxAck.cpp \ + TxAck.h \ + TxBuffer.cpp \ + TxBuffer.h \ + TxOp.h \ + TxPublish.cpp \ + TxPublish.h diff --git a/cpp/lib/broker/MessageBuilder.cpp b/cpp/lib/broker/MessageBuilder.cpp new file mode 100644 index 0000000000..7f009d5cdf --- /dev/null +++ b/cpp/lib/broker/MessageBuilder.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <MessageBuilder.h> + +#include <InMemoryContent.h> +#include <LazyLoadedContent.h> + +using namespace qpid::broker; +using namespace qpid::framing; +using std::auto_ptr; + +MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) : + handler(_handler), + store(_store), + stagingThreshold(_stagingThreshold) +{} + +void MessageBuilder::route(){ + if (message->isComplete()) { + if (handler) handler->complete(message); + message.reset(); + } +} + +void MessageBuilder::initialise(Message::shared_ptr& msg){ + if(message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); + } + message = msg; +} + +void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ + if(!message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); + } + message->setHeader(header); + if (stagingThreshold && header->getContentSize() >= stagingThreshold) { + store->stage(message); + message->releaseContent(store); + } else { + auto_ptr<Content> content(new InMemoryContent()); + message->setContent(content); + } + route(); +} + +void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){ + if(!message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); + } + message->addContent(content); + route(); +} diff --git a/cpp/lib/broker/MessageBuilder.h b/cpp/lib/broker/MessageBuilder.h new file mode 100644 index 0000000000..4e51f223f0 --- /dev/null +++ b/cpp/lib/broker/MessageBuilder.h @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MessageBuilder_ +#define _MessageBuilder_ + +#include <memory> +#include <QpidError.h> +#include <BrokerExchange.h> +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include <BasicPublishBody.h> + +namespace qpid { + namespace broker { + class MessageBuilder{ + public: + class CompletionHandler{ + public: + virtual void complete(Message::shared_ptr&) = 0; + virtual ~CompletionHandler(){} + }; + MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0); + void initialise(Message::shared_ptr& msg); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header); + void addContent(qpid::framing::AMQContentBody::shared_ptr& content); + private: + Message::shared_ptr message; + CompletionHandler* handler; + MessageStore* const store; + const u_int64_t stagingThreshold; + + void route(); + }; + } +} + + +#endif diff --git a/cpp/lib/broker/MessageStore.h b/cpp/lib/broker/MessageStore.h new file mode 100644 index 0000000000..ac74155e64 --- /dev/null +++ b/cpp/lib/broker/MessageStore.h @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MessageStore_ +#define _MessageStore_ + +#include <BrokerMessage.h> +#include <RecoveryManager.h> +#include <TransactionalStore.h> + +namespace qpid { + namespace broker { + struct MessageStoreSettings + { + /** + * Messages whose content length is larger than this value + * will be staged (i.e. will have thier data written to + * disk as it arrives) and will load their data lazily. On + * recovery therefore, only the headers should be loaded. + */ + u_int64_t stagingThreshold; + }; + /** + * An abstraction of the persistent storage for messages. + */ + class MessageStore : public TransactionalStore{ + public: + /** + * Record the existance of a durable queue + */ + virtual void create(const Queue& queue) = 0; + /** + * Destroy a durable queue + */ + virtual void destroy(const Queue& queue) = 0; + + /** + * Request recovery of queue and message state from store + */ + virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0; + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(Message::shared_ptr& msg) = 0; + + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(Message::shared_ptr& msg) = 0; + + /** + * Appends content to a previously staged message + */ + virtual void appendContent(Message* const msg, const std::string& data) = 0; + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length) = 0; + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0; + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param msg the message to dequeue + * @param queue the name of th queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0; + /** + * Treat all enqueue/dequeues where this xid was specified as being committed. + */ + virtual void committed(const std::string * const xid) = 0; + /** + * Treat all enqueue/dequeues where this xid was specified as being aborted. + */ + virtual void aborted(const std::string * const xid) = 0; + + virtual ~MessageStore(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/MessageStoreModule.cpp b/cpp/lib/broker/MessageStoreModule.cpp new file mode 100644 index 0000000000..7b0335df68 --- /dev/null +++ b/cpp/lib/broker/MessageStoreModule.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <MessageStoreModule.h> +#include <iostream> + +using namespace qpid::broker; + +MessageStoreModule::MessageStoreModule(const std::string& name) : store(name) +{ +} + +void MessageStoreModule::create(const Queue& queue) +{ + store->create(queue); +} + +void MessageStoreModule::destroy(const Queue& queue) +{ + store->destroy(queue); +} + +void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings) +{ + store->recover(registry, settings); +} + +void MessageStoreModule::stage(Message::shared_ptr& msg) +{ + store->stage(msg); +} + +void MessageStoreModule::destroy(Message::shared_ptr& msg) +{ + store->destroy(msg); +} + +void MessageStoreModule::appendContent(Message* const msg, const std::string& data) +{ + store->appendContent(msg, data); +} + +void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t offset, u_int32_t length) +{ + store->loadContent(msg, data, offset, length); +} + +void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) +{ + store->enqueue(ctxt, msg, queue, xid); +} + +void MessageStoreModule::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) +{ + store->dequeue(ctxt, msg, queue, xid); +} + +void MessageStoreModule::committed(const string * const xid) +{ + store->committed(xid); +} + +void MessageStoreModule::aborted(const string * const xid) +{ + store->aborted(xid); +} + +std::auto_ptr<TransactionContext> MessageStoreModule::begin() +{ + return store->begin(); +} + +void MessageStoreModule::commit(TransactionContext* ctxt) +{ + store->commit(ctxt); +} + +void MessageStoreModule::abort(TransactionContext* ctxt) +{ + store->abort(ctxt); +} diff --git a/cpp/lib/broker/MessageStoreModule.h b/cpp/lib/broker/MessageStoreModule.h new file mode 100644 index 0000000000..045abc3a1a --- /dev/null +++ b/cpp/lib/broker/MessageStoreModule.h @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MessageStoreModule_ +#define _MessageStoreModule_ + +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <BrokerQueue.h> +#include <RecoveryManager.h> +#include <sys/Module.h> + +namespace qpid { + namespace broker { + /** + * A null implementation of the MessageStore interface + */ + class MessageStoreModule : public MessageStore{ + qpid::sys::Module<MessageStore> store; + public: + MessageStoreModule(const std::string& name); + void create(const Queue& queue); + void destroy(const Queue& queue); + void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); + void stage(Message::shared_ptr& msg); + void destroy(Message::shared_ptr& msg); + void appendContent(Message* const msg, const std::string& data); + void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length); + void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void committed(const string * const xid); + void aborted(const string * const xid); + std::auto_ptr<TransactionContext> begin(); + void commit(TransactionContext* ctxt); + void abort(TransactionContext* ctxt); + ~MessageStoreModule(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/NameGenerator.cpp b/cpp/lib/broker/NameGenerator.cpp new file mode 100644 index 0000000000..3f281859fa --- /dev/null +++ b/cpp/lib/broker/NameGenerator.cpp @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <NameGenerator.h> +#include <sstream> + +using namespace qpid::broker; + +NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {} + +std::string NameGenerator::generate(){ + std::stringstream ss; + ss << base << counter++; + return ss.str(); +} diff --git a/cpp/lib/broker/NameGenerator.h b/cpp/lib/broker/NameGenerator.h new file mode 100644 index 0000000000..b2dbbdfb69 --- /dev/null +++ b/cpp/lib/broker/NameGenerator.h @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _NameGenerator_ +#define _NameGenerator_ + +#include <BrokerMessage.h> + +namespace qpid { + namespace broker { + class NameGenerator{ + const std::string base; + unsigned int counter; + public: + NameGenerator(const std::string& base); + std::string generate(); + }; + } +} + + +#endif diff --git a/cpp/lib/broker/NullMessageStore.cpp b/cpp/lib/broker/NullMessageStore.cpp new file mode 100644 index 0000000000..57c297c063 --- /dev/null +++ b/cpp/lib/broker/NullMessageStore.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <NullMessageStore.h> + +#include <BrokerQueue.h> +#include <RecoveryManager.h> + +#include <iostream> + +using namespace qpid::broker; + +NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} + +void NullMessageStore::create(const Queue& queue) +{ + if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} + +void NullMessageStore::destroy(const Queue& queue) +{ + if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} + +void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const) +{ + if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; +} + +void NullMessageStore::stage(Message::shared_ptr&) +{ + if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl; +} + +void NullMessageStore::destroy(Message::shared_ptr&) +{ + if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl; +} + +void NullMessageStore::appendContent(Message* const, const string&) +{ + if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl; +} + +void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t) +{ + if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl; +} + +void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const) +{ + if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} + +void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const) +{ + if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} + +void NullMessageStore::committed(const string * const) +{ + if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; +} + +void NullMessageStore::aborted(const string * const) +{ + if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; +} + +std::auto_ptr<TransactionContext> NullMessageStore::begin() +{ + return std::auto_ptr<TransactionContext>(); +} + +void NullMessageStore::commit(TransactionContext*) +{ +} + +void NullMessageStore::abort(TransactionContext*) +{ +} diff --git a/cpp/lib/broker/NullMessageStore.h b/cpp/lib/broker/NullMessageStore.h new file mode 100644 index 0000000000..e427cc723f --- /dev/null +++ b/cpp/lib/broker/NullMessageStore.h @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _NullMessageStore_ +#define _NullMessageStore_ + +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <BrokerQueue.h> + +namespace qpid { + namespace broker { + + /** + * A null implementation of the MessageStore interface + */ + class NullMessageStore : public MessageStore{ + const bool warn; + public: + NullMessageStore(bool warn = true); + virtual void create(const Queue& queue); + virtual void destroy(const Queue& queue); + virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); + virtual void stage(Message::shared_ptr& msg); + virtual void destroy(Message::shared_ptr& msg); + virtual void appendContent(Message* const msg, const std::string& data); + virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length); + virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + virtual void committed(const string * const xid); + virtual void aborted(const string * const xid); + virtual std::auto_ptr<TransactionContext> begin(); + virtual void commit(TransactionContext* ctxt); + virtual void abort(TransactionContext* ctxt); + ~NullMessageStore(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/Prefetch.h b/cpp/lib/broker/Prefetch.h new file mode 100644 index 0000000000..a1adccaee7 --- /dev/null +++ b/cpp/lib/broker/Prefetch.h @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Prefetch_ +#define _Prefetch_ + +#include <amqp_types.h> + +namespace qpid { + namespace broker { + /** + * Count and total size of asynchronously delivered + * (i.e. pushed) messages that have acks outstanding. + */ + struct Prefetch{ + u_int32_t size; + u_int16_t count; + + void reset() { size = 0; count = 0; } + }; + } +} + + +#endif diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp new file mode 100644 index 0000000000..304f696a7f --- /dev/null +++ b/cpp/lib/broker/QueueRegistry.cpp @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <QueueRegistry.h> +#include <SessionHandlerImpl.h> +#include <sstream> +#include <assert.h> + +using namespace qpid::broker; +using namespace qpid::sys; + +QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){} + +QueueRegistry::~QueueRegistry(){} + +std::pair<Queue::shared_ptr, bool> +QueueRegistry::declare(const string& declareName, bool durable, + u_int32_t autoDelete, const ConnectionToken* owner) +{ + Mutex::ScopedLock locker(lock); + string name = declareName.empty() ? generateName() : declareName; + assert(!name.empty()); + QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); + queues[name] = queue; + return std::pair<Queue::shared_ptr, bool>(queue, true); + } else { + return std::pair<Queue::shared_ptr, bool>(i->second, false); + } +} + +void QueueRegistry::destroy(const string& name){ + Mutex::ScopedLock locker(lock); + queues.erase(name); +} + +Queue::shared_ptr QueueRegistry::find(const string& name){ + Mutex::ScopedLock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { + return Queue::shared_ptr(); + } else { + return i->second; + } +} + +string QueueRegistry::generateName(){ + string name; + do { + std::stringstream ss; + ss << "tmp_" << counter++; + name = ss.str(); + // Thread safety: Private function, only called with lock held + // so this is OK. + } while(queues.find(name) != queues.end()); + return name; +} diff --git a/cpp/lib/broker/QueueRegistry.h b/cpp/lib/broker/QueueRegistry.h new file mode 100644 index 0000000000..bb9f2f4f26 --- /dev/null +++ b/cpp/lib/broker/QueueRegistry.h @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QueueRegistry_ +#define _QueueRegistry_ + +#include <map> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + +/** + * A registry of queues indexed by queue name. + * + * Queues are reference counted using shared_ptr to ensure that they + * are deleted when and only when they are no longer in use. + * + */ +class QueueRegistry{ + + public: + QueueRegistry(MessageStore* const store = 0); + ~QueueRegistry(); + + /** + * Declare a queue. + * + * @return The queue and a boolean flag which is true if the queue + * was created by this declare call false if it already existed. + */ + std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, + const ConnectionToken* const owner = 0); + + /** + * Destroy the named queue. + * + * Note: if the queue is in use it is not actually destroyed until + * all shared_ptrs to it are destroyed. During that time it is + * possible that a new queue with the same name may be + * created. This should not create any problems as the new and + * old queues exist independently. The registry has + * forgotten the old queue so there can be no confusion for + * subsequent calls to find or declare with the same name. + * + */ + void destroy(const string& name); + + /** + * Find the named queue. Return 0 if not found. + */ + Queue::shared_ptr find(const string& name); + + /** + * Generate unique queue name. + */ + string generateName(); + + private: + typedef std::map<string, Queue::shared_ptr> QueueMap; + QueueMap queues; + qpid::sys::Mutex lock; + int counter; + MessageStore* const store; +}; + + +} +} + + +#endif diff --git a/cpp/lib/broker/RecoveryManager.cpp b/cpp/lib/broker/RecoveryManager.cpp new file mode 100644 index 0000000000..6ea4c00c65 --- /dev/null +++ b/cpp/lib/broker/RecoveryManager.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <RecoveryManager.h> + +using namespace qpid::broker; + +RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {} + +RecoveryManager::~RecoveryManager() {} + +Queue::shared_ptr RecoveryManager::recoverQueue(const string& name) +{ + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); + Exchange::shared_ptr exchange = exchanges.getDefault(); + if (exchange) { + exchange->bind(result.first, result.first->getName(), 0); + } + return result.first; +} + +Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type) +{ + return exchanges.declare(name, type).first; +} diff --git a/cpp/lib/broker/RecoveryManager.h b/cpp/lib/broker/RecoveryManager.h new file mode 100644 index 0000000000..d4e4cff3fd --- /dev/null +++ b/cpp/lib/broker/RecoveryManager.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _RecoveryManager_ +#define _RecoveryManager_ + +#include <ExchangeRegistry.h> +#include <QueueRegistry.h> + +namespace qpid { +namespace broker { + + class RecoveryManager{ + QueueRegistry& queues; + ExchangeRegistry& exchanges; + public: + RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges); + ~RecoveryManager(); + Queue::shared_ptr recoverQueue(const std::string& name); + Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type); + }; + + +} +} + + +#endif diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp new file mode 100644 index 0000000000..2cc09a67e0 --- /dev/null +++ b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <SessionHandlerFactoryImpl.h> + +#include <DirectExchange.h> +#include <FanOutExchange.h> +#include <HeadersExchange.h> +#include <MessageStoreModule.h> +#include <NullMessageStore.h> +#include <SessionHandlerImpl.h> + +using namespace qpid::broker; +using namespace qpid::sys; + +namespace +{ +const std::string empty; +const std::string amq_direct("amq.direct"); +const std::string amq_topic("amq.topic"); +const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); +} + +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) : + store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), + queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) +{ + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. + exchanges.declare(amq_direct, DirectExchange::typeName); + exchanges.declare(amq_topic, TopicExchange::typeName); + exchanges.declare(amq_fanout, FanOutExchange::typeName); + exchanges.declare(amq_match, HeadersExchange::typeName); + + if(store.get()) { + RecoveryManager recoverer(queues, exchanges); + store->recover(recoverer); + } + + cleaner.start(); +} + +SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt) +{ + return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout); +} + +SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() +{ + cleaner.stop(); +} diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/SessionHandlerFactoryImpl.h new file mode 100644 index 0000000000..73ae879a58 --- /dev/null +++ b/cpp/lib/broker/SessionHandlerFactoryImpl.h @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandlerFactoryImpl_ +#define _SessionHandlerFactoryImpl_ + +#include <AutoDelete.h> +#include <ExchangeRegistry.h> +#include <MessageStore.h> +#include <QueueRegistry.h> +#include <AMQFrame.h> +#include <ProtocolInitiation.h> +#include <sys/SessionContext.h> +#include <sys/SessionHandler.h> +#include <sys/SessionHandlerFactory.h> +#include <sys/TimeoutHandler.h> +#include <memory> + +namespace qpid { + namespace broker { + + class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory + { + std::auto_ptr<MessageStore> store; + QueueRegistry queues; + ExchangeRegistry exchanges; + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + AutoDelete cleaner; + public: + SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000); + virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); + virtual ~SessionHandlerFactoryImpl(); + }; + + } +} + + +#endif diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp new file mode 100644 index 0000000000..0dddd957fd --- /dev/null +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -0,0 +1,429 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <SessionHandlerImpl.h> +#include <FanOutExchange.h> +#include <HeadersExchange.h> +#include <TopicExchange.h> +#include "assert.h" + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; +using namespace qpid::sys; + +SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, + QueueRegistry* _queues, + ExchangeRegistry* _exchanges, + AutoDelete* _cleaner, + const u_int32_t _timeout) : + context(_context), +// AMQP version management change - kpvdr 2006-11-17 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + client(context, 8, 0), + queues(_queues), + exchanges(_exchanges), + cleaner(_cleaner), + timeout(_timeout), + basicHandler(new BasicHandlerImpl(this)), + channelHandler(new ChannelHandlerImpl(this)), + connectionHandler(new ConnectionHandlerImpl(this)), + exchangeHandler(new ExchangeHandlerImpl(this)), + queueHandler(new QueueHandlerImpl(this)), + txHandler(new TxHandlerImpl(this)), + framemax(65536), + heartbeat(0) {} + +SessionHandlerImpl::~SessionHandlerImpl(){} + +Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ + channel_iterator i = channels.find(channel); + if(i == channels.end()){ + throw ConnectionException(504, "Unknown channel: " + channel); + } + return i->second; +} + +Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ + Queue::shared_ptr queue; + if (name.empty()) { + queue = getChannel(channel)->getDefaultQueue(); + if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); + } else { + queue = queues->find(name); + if (queue == 0) { + throw ChannelException( 404, "Queue not found: " + name); + } + } + return queue; +} + + +Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ + return exchanges->get(name); +} + +void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ + u_int16_t channel = frame->getChannel(); + AMQBody::shared_ptr body = frame->getBody(); + AMQMethodBody::shared_ptr method; + + switch(body->type()) + { + case METHOD_BODY: + method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); + try{ + method->invoke(*this, channel); + }catch(ChannelException& e){ + channels[channel]->close(); + channels.erase(channel); + client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + }catch(ConnectionException& e){ + client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + string error(e.what()); + client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId()); + } + break; + + case HEADER_BODY: + this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + break; + + case CONTENT_BODY: + this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + break; + + case HEARTBEAT_BODY: + //channel must be 0 + this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + break; + } +} + +void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){ + //send connection start + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); + client.getConnection().start(0, 8, 0, properties, mechanisms, locales); +} + +void SessionHandlerImpl::idleOut(){ + +} + +void SessionHandlerImpl::idleIn(){ + +} + +void SessionHandlerImpl::closed(){ + try { + for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ + Channel* c = i->second; + channels.erase(i); + c->close(); + delete c; + } + for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ + string name = (*i)->getName(); + queues->destroy(name); + exclusiveQueues.erase(i); + } + } catch(std::exception& e) { + std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl; + } +} + +void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ + getChannel(channel)->handleHeader(body); +} + +void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ + getChannel(channel)->handleContent(body); +} + +void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ + std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl; +} + +void SessionHandlerImpl::ConnectionHandlerImpl::startOk( + u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/){ + + parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} + +void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ + parent->framemax = framemax; + parent->heartbeat = heartbeat; +} + +void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ + string knownhosts; + parent->client.getConnection().openOk(0, knownhosts); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::close( + u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/) +{ + parent->client.getConnection().closeOk(0); + parent->context->close(); +} + +void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ + parent->context->close(); +} + + + +void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ + parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); + parent->client.getChannel().openOk(channel); +} + +void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} +void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} + +void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/){ + Channel* c = parent->getChannel(channel); + if(c){ + parent->channels.erase(channel); + c->close(); + delete c; + parent->client.getChannel().closeOk(channel); + } +} + +void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} + + + +void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ + + if(passive){ + if(!parent->exchanges->get(exchange)){ + throw ChannelException(404, "Exchange not found: " + exchange); + } + }else{ + try{ + std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type); + if(!response.second && response.first->getType() != type){ + throw ConnectionException(507, "Exchange already declared to be of type " + + response.first->getType() + ", requested " + type); + } + }catch(UnknownExchangeTypeException& e){ + throw ConnectionException(503, "Exchange type not implemented: " + type); + } + } + + if(!nowait){ + parent->client.getExchange().declareOk(channel); + } +} + +void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ + //TODO: implement unused + parent->exchanges->destroy(exchange); + if(!nowait) parent->client.getExchange().deleteOk(channel); +} + +void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){ + Queue::shared_ptr queue; + if (passive && !name.empty()) { + queue = parent->getQueue(name, channel); + } else { + std::pair<Queue::shared_ptr, bool> queue_created = + parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue + parent->getChannel(channel)->setDefaultQueue(queue); + + //create persistent record if required + queue_created.first->create(); + + //add default binding: + parent->exchanges->getDefault()->bind(queue, name, 0); + if (exclusive) { + parent->exclusiveQueues.push_back(queue); + } else if(autoDelete){ + parent->cleaner->add(queue); + } + } + } + if (exclusive && !queue->isExclusiveOwner(parent)) { + throw ChannelException(405, "Cannot grant exclusive access to queue"); + } + if (!nowait) { + string queueName = queue->getName(); + parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); + } +} + +void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName); + if(exchange){ +// kpvdr - cannot use this any longer as routingKey is now const +// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); +// exchange->bind(queue, routingKey, &arguments); + string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; + exchange->bind(queue, exchangeRoutingKey, &arguments); + if(!nowait) parent->client.getQueue().bindOk(channel); + }else{ + throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); + } +} + +void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + int count = queue->purge(); + if(!nowait) parent->client.getQueue().purgeOk(channel, count); +} + +void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ + ChannelException error(0, ""); + int count(0); + Queue::shared_ptr q = parent->getQueue(queue, channel); + if(ifEmpty && q->getMessageCount() > 0){ + throw ChannelException(406, "Queue not empty."); + }else if(ifUnused && q->getConsumerCount() > 0){ + throw ChannelException(406, "Queue in use."); + }else{ + //remove the queue from the list of exclusive queues if necessary + if(q->isExclusiveOwner(parent)){ + queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); + if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); + } + count = q->getMessageCount(); + q->destroy(); + parent->queues->destroy(queue); + } + if(!nowait) parent->client.getQueue().deleteOk(channel, count); +} + + + + +void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ + //TODO: handle global + parent->getChannel(channel)->setPrefetchSize(prefetchSize); + parent->getChannel(channel)->setPrefetchCount(prefetchCount); + parent->client.getBasic().qosOk(channel); +} + +void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/, + const string& queueName, const string& consumerTag, + bool noLocal, bool noAck, bool exclusive, + bool nowait){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + Channel* channel = parent->channels[channelId]; + if(!consumerTag.empty() && channel->exists(consumerTag)){ + throw ConnectionException(530, "Consumer tags must be unique"); + } + + try{ + string newTag = consumerTag; + channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0); + if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag); + + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); + }catch(ExclusiveAccessException& e){ + if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); + else throw ChannelException(403, "Access would violate previously granted exclusivity"); + } + +} + +void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ + parent->getChannel(channel)->cancel(consumerTag); + if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); +} + +void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchangeName, const string& routingKey, + bool mandatory, bool immediate){ + + Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); + if(exchange){ + Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); + parent->getChannel(channel)->handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } +} + +void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + if(!parent->getChannel(channelId)->get(queue, !noAck)){ + string clusterId;//not used, part of an imatix hack + parent->client.getBasic().getEmpty(channelId, clusterId); + } +} + +void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ + try{ + parent->getChannel(channel)->ack(deliveryTag, multiple); + }catch(InvalidAckException& e){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + } +} + +void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} + +void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ + parent->getChannel(channel)->recover(requeue); +} + +void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ + parent->getChannel(channel)->begin(); + parent->client.getTx().selectOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){ + parent->getChannel(channel)->commit(); + parent->client.getTx().commitOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ + parent->getChannel(channel)->rollback(); + parent->client.getTx().rollbackOk(channel); + parent->getChannel(channel)->recover(false); +} + diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h new file mode 100644 index 0000000000..4b89dbeaa1 --- /dev/null +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -0,0 +1,268 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandlerImpl_ +#define _SessionHandlerImpl_ + +#include <map> +#include <sstream> +#include <vector> +#include <exception> +#include <AMQFrame.h> +#include <AMQP_ClientProxy.h> +#include <AMQP_ServerOperations.h> +#include <AutoDelete.h> +#include <ExchangeRegistry.h> +#include <BrokerChannel.h> +#include <ConnectionToken.h> +#include <DirectExchange.h> +#include <OutputHandler.h> +#include <ProtocolInitiation.h> +#include <QueueRegistry.h> +#include <sys/SessionContext.h> +#include <sys/SessionHandler.h> +#include <sys/TimeoutHandler.h> +#include <TopicExchange.h> + +namespace qpid { +namespace broker { + +struct ChannelException : public std::exception { + u_int16_t code; + string text; + ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} + ~ChannelException() throw() {} + const char* what() const throw() { return text.c_str(); } +}; + +struct ConnectionException : public std::exception { + u_int16_t code; + string text; + ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} + ~ConnectionException() throw() {} + const char* what() const throw() { return text.c_str(); } +}; + +class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, + public virtual qpid::framing::AMQP_ServerOperations, + public virtual ConnectionToken +{ + typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; + typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + + qpid::sys::SessionContext* context; + qpid::framing::AMQP_ClientProxy client; + QueueRegistry* queues; + ExchangeRegistry* const exchanges; + AutoDelete* const cleaner; + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + + std::auto_ptr<BasicHandler> basicHandler; + std::auto_ptr<ChannelHandler> channelHandler; + std::auto_ptr<ConnectionHandler> connectionHandler; + std::auto_ptr<ExchangeHandler> exchangeHandler; + std::auto_ptr<QueueHandler> queueHandler; + std::auto_ptr<TxHandler> txHandler; + + std::map<u_int16_t, Channel*> channels; + std::vector<Queue::shared_ptr> exclusiveQueues; + + u_int32_t framemax; + u_int16_t heartbeat; + + void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); + void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); + void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + Channel* getChannel(u_int16_t channel); + /** + * Get named queue, never returns 0. + * @return: named queue or default queue for channel if name="" + * @exception: ChannelException if no queue of that name is found. + * @exception: ConnectionException if no queue specified and channel has not declared one. + */ + Queue::shared_ptr getQueue(const string& name, u_int16_t channel); + + Exchange::shared_ptr findExchange(const string& name); + + public: + SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, + ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); + virtual void received(qpid::framing::AMQFrame* frame); + virtual void initiated(qpid::framing::ProtocolInitiation* header); + virtual void idleOut(); + virtual void idleIn(); + virtual void closed(); + virtual ~SessionHandlerImpl(); + + class ConnectionHandlerImpl : public virtual ConnectionHandler{ + SessionHandlerImpl* parent; + public: + inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 + virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, + const string& response, const string& locale); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void secureOk(u_int16_t channel, const string& response); + + virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, + u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ConnectionHandlerImpl(){} + }; + + class ChannelHandlerImpl : public virtual ChannelHandler{ + SessionHandlerImpl* parent; + public: + inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void open(u_int16_t channel, const string& outOfBand); + + virtual void flow(u_int16_t channel, bool active); + + virtual void flowOk(u_int16_t channel, bool active); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, + u_int16_t classId, u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ChannelHandlerImpl(){} + }; + + class ExchangeHandlerImpl : public virtual ExchangeHandler{ + SessionHandlerImpl* parent; + public: + inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 + virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, + bool passive, bool durable, bool autoDelete, bool internal, bool nowait, + const qpid::framing::FieldTable& arguments); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); + + virtual ~ExchangeHandlerImpl(){} + }; + + + class QueueHandlerImpl : public virtual QueueHandler{ + SessionHandlerImpl* parent; + public: + inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 + virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); + + // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 + virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue, + const string& exchange, const string& routingKey, bool nowait, + const qpid::framing::FieldTable& arguments); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, + bool nowait); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, + bool nowait); + + virtual ~QueueHandlerImpl(){} + }; + + class BasicHandlerImpl : public virtual BasicHandler{ + SessionHandlerImpl* parent; + public: + inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void consume(u_int16_t channel, u_int16_t ticket, const string& queue, const string& consumerTag, + bool noLocal, bool noAck, bool exclusive, bool nowait); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey, + bool mandatory, bool immediate); + + // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 + virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck); + + virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); + + virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); + + virtual void recover(u_int16_t channel, bool requeue); + + virtual ~BasicHandlerImpl(){} + }; + + class TxHandlerImpl : public virtual TxHandler{ + SessionHandlerImpl* parent; + public: + TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + virtual ~TxHandlerImpl() {} + virtual void select(u_int16_t channel); + virtual void commit(u_int16_t channel); + virtual void rollback(u_int16_t channel); + }; + + + inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); } + inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); } + inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); } + inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); } + inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); } + inline virtual TxHandler* getTxHandler(){ return txHandler.get(); } + + inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); } + inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); } + inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); } + inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); } + inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } + + // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test; + // however v0.9 will not - kpvdr 2006-11-17 + inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); } +}; + +} +} + + +#endif diff --git a/cpp/lib/broker/TopicExchange.cpp b/cpp/lib/broker/TopicExchange.cpp new file mode 100644 index 0000000000..3ebb3c8c56 --- /dev/null +++ b/cpp/lib/broker/TopicExchange.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TopicExchange.h> +#include <ExchangeBinding.h> +#include <algorithm> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +// TODO aconway 2006-09-20: More efficient matching algorithm. +// Areas for improvement: +// - excessive string copying: should be 0 copy, match from original buffer. +// - match/lookup: use descision tree or other more efficient structure. + +Tokens& Tokens::operator=(const std::string& s) { + clear(); + if (s.empty()) return *this; + std::string::const_iterator i = s.begin(); + while (true) { + // Invariant: i is at the beginning of the next untokenized word. + std::string::const_iterator j = find(i, s.end(), '.'); + push_back(std::string(i, j)); + if (j == s.end()) return *this; + i = j + 1; + } + return *this; +} + +TopicPattern& TopicPattern::operator=(const Tokens& tokens) { + Tokens::operator=(tokens); + normalize(); + return *this; +} + +namespace { +const std::string hashmark("#"); +const std::string star("*"); +} + +void TopicPattern::normalize() { + std::string word; + Tokens::iterator i = begin(); + while (i != end()) { + if (*i == hashmark) { + ++i; + while (i != end()) { + // Invariant: *(i-1)==#, [begin()..i-1] is normalized. + if (*i == star) { // Move * before #. + std::swap(*i, *(i-1)); + ++i; + } else if (*i == hashmark) { + erase(i); // Remove extra # + } else { + break; + } + } + } else { + i ++; + } + } +} + + +namespace { +// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. +// Need StringRef class that operates on a string in place witout copy. +// Should be applied everywhere strings are extracted from frames. +// +bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) +{ + // Invariant: [pattern_begin..p) matches [target_begin..t) + Tokens::const_iterator p = pattern_begin; + Tokens::const_iterator t = target_begin; + while (p != pattern_end && t != target_end) + { + if (*p == star || *p == *t) { + ++p, ++t; + } else if (*p == hashmark) { + ++p; + if (do_match(p, pattern_end, t, target_end)) return true; + while (t != target_end) { + ++t; + if (do_match(p, pattern_end, t, target_end)) return true; + } + return false; + } else { + return false; + } + } + while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # + return t == target_end && p == pattern_end; +} +} + +bool TopicPattern::match(const Tokens& target) const +{ + return do_match(begin(), end(), target.begin(), target.end()); +} + +TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } + +void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ + Monitor::ScopedLock l(lock); + TopicPattern routingPattern(routingKey); + bindings[routingPattern].push_back(queue); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); +} + +void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ + Monitor::ScopedLock l(lock); + BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); + Queue::vector& qv(bi->second); + if (bi == bindings.end()) return; + Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); + if(q == qv.end()) return; + qv.erase(q); + if(qv.empty()) bindings.erase(bi); +} + + +void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ + Monitor::ScopedLock l(lock); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (i->first.match(routingKey)) { + Queue::vector& qv(i->second); + for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ + msg.deliverTo(*j); + } + } + } +} + +TopicExchange::~TopicExchange() {} + +const std::string TopicExchange::typeName("topic"); + + diff --git a/cpp/lib/broker/TopicExchange.h b/cpp/lib/broker/TopicExchange.h new file mode 100644 index 0000000000..fa0c86863a --- /dev/null +++ b/cpp/lib/broker/TopicExchange.h @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TopicExchange_ +#define _TopicExchange_ + +#include <map> +#include <vector> +#include <BrokerExchange.h> +#include <FieldTable.h> +#include <BrokerMessage.h> +#include <sys/Monitor.h> +#include <BrokerQueue.h> + +namespace qpid { +namespace broker { + +/** A vector of string tokens */ +class Tokens : public std::vector<std::string> { + public: + Tokens() {}; + // Default copy, assign, dtor are sufficient. + + /** Tokenize s, provides automatic conversion of string to Tokens */ + Tokens(const std::string& s) { operator=(s); } + /** Tokenizing assignment operator s */ + Tokens & operator=(const std::string& s); + + private: + size_t hash; +}; + + +/** + * Tokens that have been normalized as a pattern and can be matched + * with topic Tokens. Normalized meands all sequences of mixed * and + * # are reduced to a series of * followed by at most one #. + */ +class TopicPattern : public Tokens +{ + public: + TopicPattern() {} + // Default copy, assign, dtor are sufficient. + TopicPattern(const Tokens& tokens) { operator=(tokens); } + TopicPattern(const std::string& str) { operator=(str); } + TopicPattern& operator=(const Tokens&); + TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); } + + /** Match a topic */ + bool match(const std::string& topic) { return match(Tokens(topic)); } + bool match(const Tokens& topic) const; + + private: + void normalize(); +}; + +class TopicExchange : public virtual Exchange{ + typedef std::map<TopicPattern, Queue::vector> BindingMap; + BindingMap bindings; + qpid::sys::Mutex lock; + + public: + static const std::string typeName; + + TopicExchange(const string& name); + + virtual std::string getType(){ return typeName; } + + virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); + + virtual ~TopicExchange(); +}; + + + +} +} + +#endif diff --git a/cpp/lib/broker/TransactionalStore.h b/cpp/lib/broker/TransactionalStore.h new file mode 100644 index 0000000000..17bca3878a --- /dev/null +++ b/cpp/lib/broker/TransactionalStore.h @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TransactionalStore_ +#define _TransactionalStore_ + +#include <memory> + +namespace qpid { + namespace broker { + struct InvalidTransactionContextException : public std::exception {}; + + class TransactionContext{ + public: + virtual ~TransactionContext(){} + }; + + class TransactionalStore{ + public: + virtual std::auto_ptr<TransactionContext> begin() = 0; + virtual void commit(TransactionContext*) = 0; + virtual void abort(TransactionContext*) = 0; + + virtual ~TransactionalStore(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/TxAck.cpp b/cpp/lib/broker/TxAck.cpp new file mode 100644 index 0000000000..2b55b81c58 --- /dev/null +++ b/cpp/lib/broker/TxAck.cpp @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TxAck.h> + +using std::bind1st; +using std::bind2nd; +using std::mem_fun_ref; +using namespace qpid::broker; + +TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){ + +} + +bool TxAck::prepare(TransactionContext* ctxt) throw(){ + try{ + //dequeue all acked messages from their queues + for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { + if (i->coveredBy(&acked)) { + i->discard(ctxt); + } + } + //for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked)); + return true; + }catch(...){ + std::cout << "TxAck::prepare() - Failed to prepare" << std::endl; + return false; + } +} + +void TxAck::commit() throw(){ + //remove all acked records from the list + unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); +} + +void TxAck::rollback() throw(){ +} diff --git a/cpp/lib/broker/TxAck.h b/cpp/lib/broker/TxAck.h new file mode 100644 index 0000000000..d6ff8fea9c --- /dev/null +++ b/cpp/lib/broker/TxAck.h @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxAck_ +#define _TxAck_ + +#include <algorithm> +#include <functional> +#include <list> +#include <AccumulatedAck.h> +#include <DeliveryRecord.h> +#include <TxOp.h> + +namespace qpid { + namespace broker { + /** + * Defines the transactional behaviour for acks received by a + * transactional channel. + */ + class TxAck : public TxOp{ + AccumulatedAck& acked; + std::list<DeliveryRecord>& unacked; + public: + /** + * @param acked a representation of the accumulation of + * acks received + * @param unacked the record of delivered messages + */ + TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~TxAck(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/TxBuffer.cpp b/cpp/lib/broker/TxBuffer.cpp new file mode 100644 index 0000000000..acd3283bb7 --- /dev/null +++ b/cpp/lib/broker/TxBuffer.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TxBuffer.h> + +using std::mem_fun; +using namespace qpid::broker; + +bool TxBuffer::prepare(TransactionalStore* const store) +{ + std::auto_ptr<TransactionContext> ctxt; + if(store) ctxt = store->begin(); + for(op_iterator i = ops.begin(); i < ops.end(); i++){ + if(!(*i)->prepare(ctxt.get())){ + if(store) store->abort(ctxt.get()); + return false; + } + } + if(store) store->commit(ctxt.get()); + return true; +} + +void TxBuffer::commit() +{ + for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit)); + ops.clear(); +} + +void TxBuffer::rollback() +{ + for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback)); + ops.clear(); +} + +void TxBuffer::enlist(TxOp* const op) +{ + ops.push_back(op); +} diff --git a/cpp/lib/broker/TxBuffer.h b/cpp/lib/broker/TxBuffer.h new file mode 100644 index 0000000000..2d9a2a3679 --- /dev/null +++ b/cpp/lib/broker/TxBuffer.h @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxBuffer_ +#define _TxBuffer_ + +#include <algorithm> +#include <functional> +#include <vector> +#include <TransactionalStore.h> +#include <TxOp.h> + +/** + * Represents a single transaction. As such, an instance of this class + * will hold a list of operations representing the workload of the + * transaction. This work can be committed or rolled back. Committing + * is a two-stage process: first all the operations should be + * prepared, then if that succeeds they can be committed. + * + * In the 2pc case, a successful prepare may be followed by either a + * commit or a rollback. + * + * Atomicity of prepare is ensured by using a lower level + * transactional facility. This saves explicitly rolling back all the + * successfully prepared ops when one of them fails. i.e. we do not + * use 2pc internally, we instead ensure that prepare is atomic at a + * lower level. This makes individual prepare operations easier to + * code. + * + * Transactions on a messaging broker effect three types of 'action': + * (1) updates to persistent storage (2) updates to transient storage + * or cached data (3) network writes. + * + * Of these, (1) should always occur atomically during prepare to + * ensure that if the broker crashes while a transaction is being + * completed the persistent state (which is all that then remains) is + * consistent. (3) can only be done on commit, after a successful + * prepare. There is a little more flexibility with (2) but any + * changes made during prepare should be subject to the control of the + * TransactionalStore in use. + */ +namespace qpid { + namespace broker { + class TxBuffer{ + typedef std::vector<TxOp*>::iterator op_iterator; + std::vector<TxOp*> ops; + public: + /** + * Requests that all ops are prepared. This should + * primarily involve making sure that a persistent record + * of the operations is stored where necessary. + * + * All ops will be prepared under a transaction on the + * specified store. If any operation fails on prepare, + * this transaction will be rolled back. + * + * Once prepared, a transaction can be committed (or in + * the 2pc case, rolled back). + * + * @returns true if all the operations prepared + * successfully, false if not. + */ + bool prepare(TransactionalStore* const store); + /** + * Signals that the ops all prepared all completed + * successfully and can now commit, i.e. the operation can + * now be fully carried out. + * + * Should only be called after a call to prepare() returns + * true. + */ + void commit(); + /** + * Rolls back all the operations. + * + * Should only be called either after a call to prepare() + * returns true (2pc) or instead of a prepare call + * ('server-local') + */ + void rollback(); + /** + * Adds an operation to the transaction. + */ + void enlist(TxOp* const op); + }; + } +} + + +#endif diff --git a/cpp/lib/broker/TxOp.h b/cpp/lib/broker/TxOp.h new file mode 100644 index 0000000000..abba84a8e8 --- /dev/null +++ b/cpp/lib/broker/TxOp.h @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxOp_ +#define _TxOp_ + +#include <TransactionalStore.h> + +namespace qpid { + namespace broker { + class TxOp{ + public: + virtual bool prepare(TransactionContext*) throw() = 0; + virtual void commit() throw() = 0; + virtual void rollback() throw() = 0; + virtual ~TxOp(){} + }; + } +} + + +#endif diff --git a/cpp/lib/broker/TxPublish.cpp b/cpp/lib/broker/TxPublish.cpp new file mode 100644 index 0000000000..0de5fbb200 --- /dev/null +++ b/cpp/lib/broker/TxPublish.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TxPublish.h> + +using namespace qpid::broker; + +TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {} + +bool TxPublish::prepare(TransactionContext* ctxt) throw(){ + try{ + for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, 0)); + return true; + }catch(...){ + std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl; + return false; + } +} + +void TxPublish::commit() throw(){ + for_each(queues.begin(), queues.end(), Commit(msg)); +} + +void TxPublish::rollback() throw(){ +} + +void TxPublish::deliverTo(Queue::shared_ptr& queue){ + queues.push_back(queue); +} + +TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid) + : ctxt(_ctxt), msg(_msg), xid(_xid){} + +void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ + queue->enqueue(ctxt, msg, xid); +} + +TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){} + +void TxPublish::Commit::operator()(Queue::shared_ptr& queue){ + queue->process(msg); +} + diff --git a/cpp/lib/broker/TxPublish.h b/cpp/lib/broker/TxPublish.h new file mode 100644 index 0000000000..2756addab7 --- /dev/null +++ b/cpp/lib/broker/TxPublish.h @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxPublish_ +#define _TxPublish_ + +#include <algorithm> +#include <functional> +#include <list> +#include <Deliverable.h> +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <BrokerQueue.h> +#include <TxOp.h> + +namespace qpid { + namespace broker { + /** + * Defines the behaviour for publish operations on a + * transactional channel. Messages are routed through + * exchanges when received but are not at that stage delivered + * to the matching queues, rather the queues are held in an + * instance of this class. On prepare() the message is marked + * enqueued to the relevant queues in the MessagesStore. On + * commit() the messages will be passed to the queue for + * dispatch or to be added to the in-memory queue. + */ + class TxPublish : public TxOp, public Deliverable{ + class Prepare{ + TransactionContext* ctxt; + Message::shared_ptr& msg; + const std::string* const xid; + public: + Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid); + void operator()(Queue::shared_ptr& queue); + }; + + class Commit{ + Message::shared_ptr& msg; + public: + Commit(Message::shared_ptr& msg); + void operator()(Queue::shared_ptr& queue); + }; + + Message::shared_ptr msg; + std::list<Queue::shared_ptr> queues; + + public: + TxPublish(Message::shared_ptr msg); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + + virtual void deliverTo(Queue::shared_ptr& queue); + + virtual ~TxPublish(){} + }; + } +} + + +#endif |