diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-30 19:27:54 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-30 19:27:54 +0000 |
commit | b0a120b4edfdb49a08bd7c8c2479e7b1cadc5233 (patch) | |
tree | d2b4ca0e774100285e116e5442bff9e55b4a3f92 | |
parent | f491af49008a2ed219ad4507cd507b4317afa4cb (diff) | |
download | qpid-python-b0a120b4edfdb49a08bd7c8c2479e7b1cadc5233.tar.gz |
Initial implementation for tx class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469242 13f79535-47bb-0310-9956-ffa450edef68
47 files changed, 1448 insertions, 384 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/broker/AccumulatedAck.cpp new file mode 100644 index 0000000000..86e1a5e786 --- /dev/null +++ b/cpp/src/qpid/broker/AccumulatedAck.cpp @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/AccumulatedAck.h" + +using std::less_equal; +using std::bind2nd; +using namespace qpid::broker; + +void AccumulatedAck::update(u_int64_t tag, bool multiple){ + if(multiple){ + if(tag > range) range = tag; + //else don't care, it is already counted + }else if(tag < range){ + individual.push_back(tag); + } +} + +void AccumulatedAck::consolidate(){ + individual.sort(); + //remove any individual tags that are covered by range + individual.remove_if(bind2nd(less_equal<u_int64_t>(), range)); +} + +void AccumulatedAck::clear(){ + range = 0; + individual.clear(); +} + +bool AccumulatedAck::covers(u_int64_t tag) const{ + return tag < range || find(individual.begin(), individual.end(), tag) != individual.end(); +} diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h new file mode 100644 index 0000000000..54562b7af5 --- /dev/null +++ b/cpp/src/qpid/broker/AccumulatedAck.h @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _AccumulatedAck_ +#define _AccumulatedAck_ + +#include <algorithm> +#include <functional> +#include <list> + +namespace qpid { + namespace broker { + /** + * Keeps an accumulated record of acked messages (by delivery + * tag). + */ + struct AccumulatedAck{ + /** + * If not zero, then everything up to this value has been + * acked. + */ + u_int64_t range; + /** + * List of individually acked messages that are not + * included in the range marked by 'range'. + */ + std::list<u_int64_t> individual; + + void update(u_int64_t tag, bool multiple); + void consolidate(); + void clear(); + bool covers(u_int64_t tag) const; + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp index 5497eda842..c40811e921 100644 --- a/cpp/src/qpid/broker/Channel.cpp +++ b/cpp/src/qpid/broker/Channel.cpp @@ -21,6 +21,8 @@ #include <sstream> #include <assert.h> +using std::mem_fun_ref; +using std::bind2nd; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::concurrent; @@ -29,14 +31,17 @@ using namespace qpid::concurrent; Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : id(_id), out(_out), - deliveryTag(1), + currentDeliveryTag(1), transactional(false), prefetchSize(0), prefetchCount(0), - outstandingSize(0), - outstandingCount(0), framesize(_framesize), - tagGenerator("sgen"){} + tagGenerator("sgen"), + store(0), + messageBuilder(this){ + + outstanding.reset(); +} Channel::~Channel(){ } @@ -86,30 +91,36 @@ void Channel::begin(){ } void Channel::commit(){ - + TxAck txAck(accumulatedAck, unacked); + txBuffer.enlist(&txAck); + if(txBuffer.prepare(store)){ + txBuffer.commit(); + } + accumulatedAck.clear(); } void Channel::rollback(){ - + txBuffer.rollback(); + accumulatedAck.clear(); } void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ Locker locker(deliveryLock); - u_int64_t myDeliveryTag = deliveryTag++; + u_int64_t deliveryTag = currentDeliveryTag++; if(ackExpected){ - unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); - outstandingSize += msg->contentSize(); - outstandingCount++; + unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); + outstanding.size += msg->contentSize(); + outstanding.count++; } //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); + msg->deliver(out, id, consumerTag, deliveryTag, framesize); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ Locker locker(deliveryLock); - bool countOk = !prefetchCount || prefetchCount > unacknowledged.size(); - bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty(); + bool countOk = !prefetchCount || prefetchCount > unacked.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; } @@ -144,43 +155,66 @@ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } -void Channel::checkMessage(const std::string& text){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text); - } +void Channel::handlePublish(Message* _message, Exchange* _exchange){ + Message::shared_ptr message(_message); + exchange = _exchange; + messageBuilder.initialise(message); +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ + messageBuilder.setHeader(header); +} + +void Channel::handleContent(AMQContentBody::shared_ptr content){ + messageBuilder.addContent(content); } -void Channel::handlePublish(Message* msg){ - if(message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); +void Channel::complete(Message::shared_ptr& msg){ + if(exchange){ + if(transactional){ + TxPublish* deliverable = new TxPublish(msg); + exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable)); + }else{ + DeliverableMessage deliverable(msg); + exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + } + exchange = 0; + }else{ + std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; } - message = Message::shared_ptr(msg); } -void Channel::ack(u_int64_t _deliveryTag, bool multiple){ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag)); - if(i == unacknowledged.end()){ - throw InvalidAckException(); - }else if(multiple){ - unacknowledged.erase(unacknowledged.begin(), ++i); - //recompute prefetch outstanding (note: messages delivered through get are ignored) - CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch())); - outstandingSize = calc.getSize(); - outstandingCount = calc.getCount(); +void Channel::ack(u_int64_t deliveryTag, bool multiple){ + if(transactional){ + accumulatedAck.update(deliveryTag, multiple); + //TODO: I think the outstanding prefetch size & count should be updated at this point... + //TODO: ...this may then necessitate dispatching to consumers }else{ - if(!i->pull){ - outstandingSize -= i->msg->contentSize(); - outstandingCount--; + Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); + if(i == unacked.end()){ + throw InvalidAckException(); + }else if(multiple){ + ack_iterator end = ++i; + for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); + unacked.erase(unacked.begin(), end); + + //recalculate the prefetch: + outstanding.reset(); + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); + }else{ + i->discard(); + i->subtractFrom(&outstanding); + unacked.erase(i); } - unacknowledged.erase(i); - } - //if the prefetch limit had previously been reached, there may - //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); + //if the prefetch limit had previously been reached, there may + //be messages that can be now be delivered + for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ + j->second->requestDispatch(); + } } } @@ -188,14 +222,12 @@ void Channel::recover(bool requeue){ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ - outstandingSize = 0; - outstandingCount = 0; - ack_iterator start(unacknowledged.begin()); - ack_iterator end(unacknowledged.end()); - for_each(start, end, Requeue()); - unacknowledged.erase(start, end); + outstanding.reset(); + std::list<DeliveryRecord> copy = unacked; + unacked.clear(); + for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this)); + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); } } @@ -203,10 +235,10 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Locker locker(deliveryLock); - u_int64_t myDeliveryTag = deliveryTag++; + u_int64_t myDeliveryTag = currentDeliveryTag++; msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); if(ackExpected){ - unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag)); + unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } return true; }else{ @@ -214,43 +246,6 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ } } -Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} - -bool Channel::MatchAck::operator()(AckRecord& record) const{ - return tag == record.deliveryTag; -} - -void Channel::Requeue::operator()(AckRecord& record) const{ - record.msg->redeliver(); - record.queue->deliver(record.msg); -} - -Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} - -void Channel::Redeliver::operator()(AckRecord& record) const{ - if(record.pull){ - //if message was originally sent as response to get, we must requeue it - record.msg->redeliver(); - record.queue->deliver(record.msg); - }else{ - record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); - } -} - -Channel::CalculatePrefetch::CalculatePrefetch() : size(0){} - -void Channel::CalculatePrefetch::operator()(AckRecord& record){ - if(!record.pull){ - //ignore messages that were sent in response to get when calculating prefetch - size += record.msg->contentSize(); - count++; - } -} - -u_int32_t Channel::CalculatePrefetch::getSize(){ - return size; -} - -u_int16_t Channel::CalculatePrefetch::getCount(){ - return count; +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ + msg->deliver(out, id, consumerTag, deliveryTag, framesize); } diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h index e742f45279..ef6700ff80 100644 --- a/cpp/src/qpid/broker/Channel.h +++ b/cpp/src/qpid/broker/Channel.h @@ -19,17 +19,29 @@ #define _Channel_ #include <algorithm> +#include <functional> +#include <list> #include <map> -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/BasicPublishBody.h" +#include "qpid/broker/AccumulatedAck.h" #include "qpid/broker/Binding.h" #include "qpid/broker/Consumer.h" +#include "qpid/broker/DeletingTxOp.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/broker/MessageBuilder.h" #include "qpid/broker/NameGenerator.h" -#include "qpid/framing/OutputHandler.h" +#include "qpid/broker/Prefetch.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/TransactionalStore.h" +#include "qpid/broker/TxAck.h" +#include "qpid/broker/TxBuffer.h" +#include "qpid/broker/TxPublish.h" +#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/BasicPublishBody.h" namespace qpid { namespace broker { @@ -37,8 +49,7 @@ namespace qpid { * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ - class Channel{ - private: + class Channel : private MessageBuilder::CompletionHandler{ class ConsumerImpl : public virtual Consumer{ Channel* parent; string tag; @@ -54,92 +65,29 @@ namespace qpid { }; typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; - - struct AckRecord{ - Message::shared_ptr msg; - Queue::shared_ptr queue; - string consumerTag; - u_int64_t deliveryTag; - bool pull; - - AckRecord(Message::shared_ptr _msg, - Queue::shared_ptr _queue, - const string _consumerTag, - const u_int64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - deliveryTag(_deliveryTag), - pull(false){} - - AckRecord(Message::shared_ptr _msg, - Queue::shared_ptr _queue, - const u_int64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(""), - deliveryTag(_deliveryTag), - pull(true){} - }; - - typedef std::vector<AckRecord>::iterator ack_iterator; - - class MatchAck{ - const u_int64_t tag; - public: - MatchAck(u_int64_t tag); - bool operator()(AckRecord& record) const; - }; - - class Requeue{ - public: - void operator()(AckRecord& record) const; - }; - - class Redeliver{ - Channel* const channel; - public: - Redeliver(Channel* const channel); - void operator()(AckRecord& record) const; - }; - - class CalculatePrefetch{ - u_int32_t size; - u_int16_t count; - public: - CalculatePrefetch(); - void operator()(AckRecord& record); - u_int32_t getSize(); - u_int16_t getCount(); - }; - const int id; qpid::framing::OutputHandler* out; - u_int64_t deliveryTag; + u_int64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; bool transactional; std::map<string, ConsumerImpl*> consumers; u_int32_t prefetchSize; u_int16_t prefetchCount; - u_int32_t outstandingSize; - u_int16_t outstandingCount; + Prefetch outstanding; u_int32_t framesize; - Message::shared_ptr message; NameGenerator tagGenerator; - std::vector<AckRecord> unacknowledged; + std::list<DeliveryRecord> unacked; qpid::concurrent::MonitorImpl deliveryLock; + TxBuffer txBuffer; + AccumulatedAck accumulatedAck; + TransactionalStore* store; + MessageBuilder messageBuilder;//builder for in-progress message + Exchange* exchange;//exchange to which any in-progress message was published to + virtual void complete(Message::shared_ptr& msg); void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); - void checkMessage(const std::string& text); - bool checkPrefetch(Message::shared_ptr& msg); void cancel(consumer_iterator consumer); - - template<class Operation> Operation processMessage(Operation route){ - if(message->isComplete()){ - route(message); - message.reset(); - } - return route; - } - + bool checkPrefetch(Message::shared_ptr& msg); public: Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); @@ -158,37 +106,10 @@ namespace qpid { void rollback(); void ack(u_int64_t deliveryTag, bool multiple); void recover(bool requeue); - - /** - * Handles the initial publish request though a - * channel. The header and (if applicable) content will be - * accumulated through calls to handleHeader() and - * handleContent() - */ - void handlePublish(Message* msg); - - /** - * A template method that handles a received header and if - * there is no content routes it using the functor passed - * in. - */ - template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ - checkMessage("Invalid message sequence: got header before publish."); - message->setHeader(header); - return processMessage(route); - } - - /** - * A template method that handles a received content and - * if this completes the message, routes it using the - * functor passed in. - */ - template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ - checkMessage("Invalid message sequence: got content before publish."); - message->addContent(content); - return processMessage(route); - } - + void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); + void handlePublish(Message* msg, Exchange* exchange); + void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void handleContent(qpid::framing::AMQContentBody::shared_ptr content); }; struct InvalidAckException{}; diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp index 7aefe19b2b..2dcefd878d 100644 --- a/cpp/src/qpid/broker/Configuration.cpp +++ b/cpp/src/qpid/broker/Configuration.cpp @@ -191,6 +191,8 @@ bool Configuration::BoolOption::needsValue() const { return false; } -void Configuration::BoolOption::setValue(const std::string& _value){ - value = strcasecmp(_value.c_str(), "true") == 0; +void Configuration::BoolOption::setValue(const std::string& /*not required*/){ + //BoolOptions have no value. The fact that the option is specified + //implies the value is true. + value = true; } diff --git a/cpp/src/qpid/broker/DeletingTxOp.cpp b/cpp/src/qpid/broker/DeletingTxOp.cpp new file mode 100644 index 0000000000..e9b9f30326 --- /dev/null +++ b/cpp/src/qpid/broker/DeletingTxOp.cpp @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/DeletingTxOp.h" + +using namespace qpid::broker; + +DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){} + +bool DeletingTxOp::prepare() throw(){ + return delegate && delegate->prepare(); +} + +void DeletingTxOp::commit() throw(){ + if(delegate){ + delegate->commit(); + delete delegate; + delegate = 0; + } +} + +void DeletingTxOp::rollback() throw(){ + if(delegate){ + delegate->rollback(); + delete delegate; + delegate = 0; + } +} diff --git a/cpp/src/qpid/broker/DeletingTxOp.h b/cpp/src/qpid/broker/DeletingTxOp.h new file mode 100644 index 0000000000..7e43717f17 --- /dev/null +++ b/cpp/src/qpid/broker/DeletingTxOp.h @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _DeletingTxOp_ +#define _DeletingTxOp_ + +#include "qpid/broker/TxOp.h" + +namespace qpid { + namespace broker { + /** + * TxOp wrapper that will delegate calls & delete the object + * to which it delegates after completion of the transaction. + */ + class DeletingTxOp : public virtual TxOp{ + TxOp* delegate; + public: + DeletingTxOp(TxOp* const delegate); + virtual bool prepare() throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~DeletingTxOp(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/Router.h b/cpp/src/qpid/broker/Deliverable.h index 3b4a3a0a4c..5aded061b7 100644 --- a/cpp/src/qpid/broker/Router.h +++ b/cpp/src/qpid/broker/Deliverable.h @@ -15,22 +15,17 @@ * limitations under the License. * */ -#ifndef _Router_ -#define _Router_ +#ifndef _Deliverable_ +#define _Deliverable_ -#include "qpid/broker/ExchangeRegistry.h" -#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" -/** - * A routing functor - */ namespace qpid { namespace broker { - class Router{ - ExchangeRegistry& registry; + class Deliverable{ public: - Router(ExchangeRegistry& registry); - void operator()(Message::shared_ptr& msg); + virtual void deliverTo(Queue::shared_ptr& queue) = 0; + virtual ~Deliverable(){} }; } } diff --git a/cpp/src/qpid/broker/Router.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp index d5853473af..aff0012bf4 100644 --- a/cpp/src/qpid/broker/Router.cpp +++ b/cpp/src/qpid/broker/DeliverableMessage.cpp @@ -15,18 +15,16 @@ * limitations under the License. * */ -#include "qpid/broker/Router.h" +#include "qpid/broker/DeliverableMessage.h" using namespace qpid::broker; -Router::Router(ExchangeRegistry& _registry) : registry(_registry){} - -void Router::operator()(Message::shared_ptr& msg){ - Exchange* exchange = registry.get(msg->getExchange()); - if(exchange){ - exchange->route(msg, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); - }else{ - std::cout << "WARNING: Could not route message, unknown exchange: " << msg->getExchange() << std::endl; - } +DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg) +{ +} +void DeliverableMessage::deliverTo(Queue::shared_ptr& queue) +{ + queue->deliver(msg); } + diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h new file mode 100644 index 0000000000..9c65cf7103 --- /dev/null +++ b/cpp/src/qpid/broker/DeliverableMessage.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _DeliverableMessage_ +#define _DeliverableMessage_ + +#include "qpid/broker/Deliverable.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" + +namespace qpid { + namespace broker { + class DeliverableMessage : public Deliverable{ + Message::shared_ptr msg; + public: + DeliverableMessage(Message::shared_ptr& msg); + virtual void deliverTo(Queue::shared_ptr& queue); + virtual ~DeliverableMessage(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp new file mode 100644 index 0000000000..19b786a8d3 --- /dev/null +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -0,0 +1,87 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/DeliveryRecord.h" +#include "qpid/broker/Channel.h" + +using namespace qpid::broker; + +DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const string _consumerTag, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + deliveryTag(_deliveryTag), + pull(false){} + +DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(""), + deliveryTag(_deliveryTag), + pull(true){} + + +void DeliveryRecord::discard() const{ + queue->dequeue(msg, 0); +} + +bool DeliveryRecord::matches(u_int64_t tag) const{ + return deliveryTag == tag; +} + +bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ + return range->covers(deliveryTag); +} + +void DeliveryRecord::discardIfCoveredBy(const AccumulatedAck* const range) const{ + if(coveredBy(range)) discard(); +} + +void DeliveryRecord::redeliver(Channel* const channel) const{ + if(pull){ + //if message was originally sent as response to get, we must requeue it + requeue(); + }else{ + channel->deliver(msg, consumerTag, deliveryTag); + } +} + +void DeliveryRecord::requeue() const{ + msg->redeliver(); + queue->deliver(msg); +} + +void DeliveryRecord::addTo(Prefetch* const prefetch) const{ + if(!pull){ + //ignore 'pulled' messages (i.e. those that were sent in + //response to get) when calculating prefetch + prefetch->size += msg->contentSize(); + prefetch->count++; + } +} + +void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{ + if(!pull){ + //ignore 'pulled' messages (i.e. those that were sent in + //response to get) when calculating prefetch + prefetch->size -= msg->contentSize(); + prefetch->count--; + } +} diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h new file mode 100644 index 0000000000..da74156000 --- /dev/null +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -0,0 +1,61 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _DeliveryRecord_ +#define _DeliveryRecord_ + +#include <algorithm> +#include <list> +#include "qpid/broker/AccumulatedAck.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Prefetch.h" +#include "qpid/broker/Queue.h" + +namespace qpid { + namespace broker { + class Channel; + + /** + * Record of a delivery for which an ack is outstanding. + */ + class DeliveryRecord{ + mutable Message::shared_ptr msg; + mutable Queue::shared_ptr queue; + string consumerTag; + u_int64_t deliveryTag; + bool pull; + + public: + DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const string consumerTag, const u_int64_t deliveryTag); + DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag); + + void discard() const; + bool matches(u_int64_t tag) const; + bool coveredBy(const AccumulatedAck* const range) const; + void discardIfCoveredBy(const AccumulatedAck* const range) const; + void requeue() const; + void redeliver(Channel* const) const; + void addTo(Prefetch* const prefetch) const; + void subtractFrom(Prefetch* const prefetch) const; + }; + + typedef std::list<DeliveryRecord>::iterator ack_iterator; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 63cfda8f51..46693f6f3c 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -51,12 +51,12 @@ void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, F lock.release(); } -void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){ +void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){ lock.acquire(); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); int count(0); for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ - (*i)->deliver(msg); + msg.deliverTo(*i); } if(!count){ std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index e998d3caa6..fbbad8109e 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -41,7 +41,7 @@ namespace broker { virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args); virtual ~DirectExchange(); }; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index f84f0d969e..dfa7559683 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -18,9 +18,9 @@ #ifndef _Exchange_ #define _Exchange_ -#include "qpid/framing/FieldTable.h" -#include "qpid/broker/Message.h" +#include "qpid/broker/Deliverable.h" #include "qpid/broker/Queue.h" +#include "qpid/framing/FieldTable.h" namespace qpid { namespace broker { @@ -32,7 +32,7 @@ namespace broker { std::string getName() { return name; } virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0; + virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0; }; } } diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 0a184d5993..c519771132 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -44,10 +44,10 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* } } -void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){ +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){ Locker locker(lock); for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ - (*i)->deliver(msg); + msg.deliverTo(*i); } } diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index b3255e3b0f..209d964bf6 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -42,7 +42,7 @@ class FanOutExchange : public virtual Exchange { virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args); virtual ~FanOutExchange(); }; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 96365e2130..aece22a413 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -58,10 +58,10 @@ void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey } -void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){ +void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){ Locker locker(lock);; for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *args)) i->second->deliver(msg); + if (match(i->first, *args)) msg.deliverTo(i->second); } } diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 5c7525ee7c..f4261916d9 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -45,7 +45,7 @@ class HeadersExchange : public virtual Exchange { virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args); virtual ~HeadersExchange(); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index b5292ef043..962c74864e 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -17,7 +17,6 @@ */ #include "qpid/concurrent/MonitorImpl.h" #include "qpid/broker/Message.h" -#include "qpid/broker/ExchangeRegistry.h" #include <iostream> using namespace boost; diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index b6f41e817a..cfe29bdfcf 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -19,16 +19,16 @@ #define _Message_ #include <boost/shared_ptr.hpp> +#include "qpid/broker/ConnectionToken.h" +#include "qpid/broker/TxBuffer.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/BasicHeaderProperties.h" #include "qpid/framing/BasicPublishBody.h" -#include "qpid/broker/ConnectionToken.h" #include "qpid/framing/OutputHandler.h" namespace qpid { namespace broker { - class ExchangeRegistry; /** * Represents an AMQP message, i.e. a header body, a list of @@ -48,6 +48,7 @@ namespace qpid { qpid::framing::AMQHeaderBody::shared_ptr header; content_list content; u_int64_t size; + TxBuffer* tx; void sendContent(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); @@ -79,8 +80,9 @@ namespace qpid { qpid::framing::BasicHeaderProperties* getHeaderProperties(); const string& getRoutingKey() const { return routingKey; } const string& getExchange() const { return exchange; } - u_int64_t contentSize() const{ return size; } - + u_int64_t contentSize() const { return size; } + TxBuffer* getTx() const { return tx; } + void setTx(TxBuffer* _tx) { tx = _tx; } }; } } diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp new file mode 100644 index 0000000000..c8488292b7 --- /dev/null +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/MessageBuilder.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +MessageBuilder::MessageBuilder(CompletionHandler* _handler) : handler(_handler) {} + +void MessageBuilder::route(){ + if(message->isComplete()){ + if(handler) handler->complete(message); + message.reset(); + } +} + +void MessageBuilder::initialise(Message::shared_ptr& msg){ + if(message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); + } + message = msg; +} + +void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ + if(!message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); + } + message->setHeader(header); + route(); +} + +void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){ + if(!message.get()){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); + } + message->addContent(content); + route(); +} diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h new file mode 100644 index 0000000000..a5966574ad --- /dev/null +++ b/cpp/src/qpid/broker/MessageBuilder.h @@ -0,0 +1,51 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _MessageBuilder_ +#define _MessageBuilder_ + +#include "qpid/QpidError.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Message.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/BasicPublishBody.h" + +namespace qpid { + namespace broker { + class MessageBuilder{ + public: + class CompletionHandler{ + public: + virtual void complete(Message::shared_ptr&) = 0; + virtual ~CompletionHandler(){} + }; + MessageBuilder(CompletionHandler* _handler); + void initialise(Message::shared_ptr& msg); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header); + void addContent(qpid::framing::AMQContentBody::shared_ptr& content); + private: + Message::shared_ptr message; + CompletionHandler* handler; + + void route(); + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h new file mode 100644 index 0000000000..af9dd20079 --- /dev/null +++ b/cpp/src/qpid/broker/MessageStore.h @@ -0,0 +1,71 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _MessageStore_ +#define _MessageStore_ + +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/TransactionalStore.h" + +namespace qpid { + namespace broker { + /** + * An abstraction of the persistent storage for messages. + */ + class MessageStore : public TransactionalStore{ + public: + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0; + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param msg the message to dequeue + * @param queue the name of th queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0; + /** + * Treat all enqueue/dequeues where this xid was specified as being committed. + */ + virtual void committed(const string * const xid) = 0; + /** + * Treat all enqueue/dequeues where this xid was specified as being aborted. + */ + virtual void aborted(const string * const xid) = 0; + + virtual ~MessageStore(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/Prefetch.cpp b/cpp/src/qpid/broker/Prefetch.cpp new file mode 100644 index 0000000000..6d9dbda13c --- /dev/null +++ b/cpp/src/qpid/broker/Prefetch.cpp @@ -0,0 +1,26 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/broker/Prefetch.h" + +using namespace qpid::broker; + +void Prefetch::reset(){ + size = 0; + count = 0; +} diff --git a/cpp/src/qpid/broker/Prefetch.h b/cpp/src/qpid/broker/Prefetch.h new file mode 100644 index 0000000000..97abb4102d --- /dev/null +++ b/cpp/src/qpid/broker/Prefetch.h @@ -0,0 +1,39 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Prefetch_ +#define _Prefetch_ + +#include "qpid/framing/amqp_types.h" + +namespace qpid { + namespace broker { + /** + * Count and total size of asynchronously delivered + * (i.e. pushed) messages that have acks outstanding. + */ + struct Prefetch{ + u_int32_t size; + u_int16_t count; + + void reset(); + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index ee9bff4513..14a89f7a66 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -16,16 +16,21 @@ * */ #include "qpid/broker/Queue.h" +#include "qpid/broker/MessageStore.h" #include "qpid/concurrent/MonitorImpl.h" #include <iostream> using namespace qpid::broker; using namespace qpid::concurrent; -Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) : +Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, + MessageStore* const _store, + const ConnectionToken* const _owner) : + name(_name), autodelete(_autodelete), durable(_durable), + store(_store), owner(_owner), queueing(false), dispatching(false), @@ -48,6 +53,11 @@ void Queue::bound(Binding* b){ } void Queue::deliver(Message::shared_ptr& msg){ + enqueue(msg, 0); + process(msg); +} + +void Queue::process(Message::shared_ptr& msg){ Locker locker(lock); if(queueing || !dispatch(msg)){ queueing = true; @@ -153,3 +163,17 @@ bool Queue::canAutoDelete() const{ Locker locker(lock); return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete); } + +void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){ + bool persistent(false);//TODO: pull this from headers + if(persistent){ + store->enqueue(msg, name, xid); + } +} + +void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){ + bool persistent(false);//TODO: pull this from headers + if(persistent){ + store->dequeue(msg, name, xid); + } +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 0f20400daa..93570f59cc 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -31,6 +31,7 @@ namespace qpid { namespace broker { + class MessageStore; /** * Thrown when exclusive access would be violated. @@ -47,6 +48,7 @@ namespace qpid { const string name; const u_int32_t autodelete; const bool durable; + MessageStore* const store; const ConnectionToken* const owner; std::vector<Consumer*> consumers; std::queue<Binding*> bindings; @@ -67,7 +69,9 @@ namespace qpid { typedef std::vector<shared_ptr> vector; - Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0); + Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, + MessageStore* const store = 0, + const ConnectionToken* const owner = 0); ~Queue(); /** * Informs the queue of a binding that should be cancelled on @@ -75,13 +79,16 @@ namespace qpid { */ void bound(Binding* b); /** - * Delivers a message to the queue from where it will be - * dispatched to immediately to a consumer if one is - * available or stored for dequeue or later dispatch if - * not. + * Delivers a message to the queue. Will record it as + * enqueued if persistent then process it. */ void deliver(Message::shared_ptr& msg); /** + * Dispatches the messages immediately to a consumer if + * one is available or stores it for later if not. + */ + void process(Message::shared_ptr& msg); + /** * Dispatch any queued messages providing there are * consumers for them. Only one thread can be dispatching * at any time, but this method (rather than the caller) @@ -98,6 +105,9 @@ namespace qpid { inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } inline bool hasExclusiveConsumer() const { return exclusive; } bool canAutoDelete() const; + + void enqueue(Message::shared_ptr& msg, const string * const xid); + void dequeue(Message::shared_ptr& msg, const string * const xid); }; } } diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 1f7684e608..973201fe64 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -29,14 +29,15 @@ QueueRegistry::QueueRegistry() : counter(1){} QueueRegistry::~QueueRegistry(){} std::pair<Queue::shared_ptr, bool> -QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner) +QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, + MessageStore* const store, const ConnectionToken* owner) { Locker locker(lock); string name = declareName.empty() ? generateName() : declareName; assert(!name.empty()); QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner)); + Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, owner)); queues[name] = queue; return std::pair<Queue::shared_ptr, bool>(queue, true); } else { diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 42d75fc3e0..6f80291192 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -46,7 +46,9 @@ class QueueRegistry{ * @return The queue and a boolean flag which is true if the queue * was created by this declare call false if it already existed. */ - std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0); + std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, + MessageStore* const _store = 0, + const ConnectionToken* const owner = 0); /** * Destroy the named queue. diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp index 35f5b20854..a472cd27b0 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp @@ -19,7 +19,6 @@ #include "qpid/broker/SessionHandlerImpl.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" -#include "qpid/broker/Router.h" #include "qpid/broker/TopicExchange.h" #include "assert.h" @@ -40,11 +39,12 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, exchanges(_exchanges), cleaner(_cleaner), timeout(_timeout), - connectionHandler(new ConnectionHandlerImpl(this)), - channelHandler(new ChannelHandlerImpl(this)), basicHandler(new BasicHandlerImpl(this)), + channelHandler(new ChannelHandlerImpl(this)), + connectionHandler(new ConnectionHandlerImpl(this)), exchangeHandler(new ExchangeHandlerImpl(this)), queueHandler(new QueueHandlerImpl(this)), + txHandler(new TxHandlerImpl(this)), framemax(65536), heartbeat(0) {} @@ -146,11 +146,11 @@ void SessionHandlerImpl::closed(){ } void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body, Router(*exchanges)); + getChannel(channel)->handleHeader(body); } void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body, Router(*exchanges)); + getChannel(channel)->handleContent(body); } void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ @@ -261,7 +261,8 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t if (passive && !name.empty()) { queue = parent->getQueue(name, channel); } else { - std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); + std::pair<Queue::shared_ptr, bool> queue_created = + parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -367,11 +368,16 @@ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& con } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - string& exchange, string& routingKey, + string& exchangeName, string& routingKey, bool mandatory, bool immediate){ - Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); - parent->getChannel(channel)->handlePublish(msg); + Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); + if(exchange){ + Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); + parent->getChannel(channel)->handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } } void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){ @@ -395,4 +401,20 @@ void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64 void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ parent->getChannel(channel)->recover(requeue); } + +void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ + parent->getChannel(channel)->begin(); + parent->client.getTx().selectOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){ + parent->getChannel(channel)->commit(); + parent->client.getTx().commitOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ + parent->getChannel(channel)->rollback(); + parent->client.getTx().rollbackOk(channel); + parent->getChannel(channel)->recover(false); +} diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.h b/cpp/src/qpid/broker/SessionHandlerImpl.h index afaae74d97..6b9b5cca6b 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.h +++ b/cpp/src/qpid/broker/SessionHandlerImpl.h @@ -71,11 +71,12 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler, AutoDelete* const cleaner; const u_int32_t timeout;//timeout for auto-deleted queues (in ms) - std::auto_ptr<ConnectionHandler> connectionHandler; - std::auto_ptr<ChannelHandler> channelHandler; std::auto_ptr<BasicHandler> basicHandler; + std::auto_ptr<ChannelHandler> channelHandler; + std::auto_ptr<ConnectionHandler> connectionHandler; std::auto_ptr<ExchangeHandler> exchangeHandler; std::auto_ptr<QueueHandler> queueHandler; + std::auto_ptr<TxHandler> txHandler; std::map<u_int16_t, Channel*> channels; std::vector<Queue::shared_ptr> exclusiveQueues; @@ -212,18 +213,29 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler, virtual ~BasicHandlerImpl(){} }; + class TxHandlerImpl : public virtual TxHandler{ + SessionHandlerImpl* parent; + public: + TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + virtual ~TxHandlerImpl() {} + virtual void select(u_int16_t channel); + virtual void commit(u_int16_t channel); + virtual void rollback(u_int16_t channel); + }; + + inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); } inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); } inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); } inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); } inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); } + inline virtual TxHandler* getTxHandler(){ return txHandler.get(); } - inline virtual AccessHandler* getAccessHandler(){ return 0; } - inline virtual FileHandler* getFileHandler(){ return 0; } - inline virtual StreamHandler* getStreamHandler(){ return 0; } - inline virtual TxHandler* getTxHandler(){ return 0; } - inline virtual DtxHandler* getDtxHandler(){ return 0; } - inline virtual TunnelHandler* getTunnelHandler(){ return 0; } + inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); } + inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); } + inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); } + inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); } + inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } }; } diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 9ab779777c..dc252d208f 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -135,13 +135,13 @@ void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, Fi } -void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){ +void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){ lock.acquire(); for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(routingKey)) { Queue::vector& qv(i->second); for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ - (*j)->deliver(msg); + msg.deliverTo(*j); } } } diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index d6c1946e71..9f08153a2e 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -82,7 +82,7 @@ class TopicExchange : public virtual Exchange{ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args); virtual ~TopicExchange(); }; diff --git a/cpp/src/qpid/broker/TransactionalStore.h b/cpp/src/qpid/broker/TransactionalStore.h new file mode 100644 index 0000000000..3976edd7b9 --- /dev/null +++ b/cpp/src/qpid/broker/TransactionalStore.h @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TransactionalStore_ +#define _TransactionalStore_ + +namespace qpid { + namespace broker { + class TransactionalStore{ + public: + virtual void begin() = 0; + virtual void commit() = 0; + virtual void abort() = 0; + + virtual ~TransactionalStore(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp new file mode 100644 index 0000000000..7e787a463e --- /dev/null +++ b/cpp/src/qpid/broker/TxAck.cpp @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/TxAck.h" + +using std::bind1st; +using std::bind2nd; +using std::mem_fun_ref; +using namespace qpid::broker; + +TxAck::TxAck(AccumulatedAck _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){ + +} + +bool TxAck::prepare() throw(){ + try{ + //dequeue all acked messages from their queues + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked)); + return true; + }catch(...){ + std::cout << "TxAck::prepare() - Failed to prepare" << std::endl; + return false; + } +} + +void TxAck::commit() throw(){ + //remove all acked records from the list + unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); +} + +void TxAck::rollback() throw(){ +} diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h new file mode 100644 index 0000000000..645bf1b1b0 --- /dev/null +++ b/cpp/src/qpid/broker/TxAck.h @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TxAck_ +#define _TxAck_ + +#include <algorithm> +#include <functional> +#include <list> +#include "qpid/broker/AccumulatedAck.h" +#include "qpid/broker/DeliveryRecord.h" +#include "qpid/broker/TxOp.h" + +namespace qpid { + namespace broker { + class TxAck : public TxOp{ + AccumulatedAck acked; + std::list<DeliveryRecord>& unacked; + public: + TxAck(AccumulatedAck acked, std::list<DeliveryRecord>& unacked); + virtual bool prepare() throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~TxAck(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp new file mode 100644 index 0000000000..0529892930 --- /dev/null +++ b/cpp/src/qpid/broker/TxBuffer.cpp @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/TxBuffer.h" + +using namespace qpid::broker; + +bool TxBuffer::prepare(TransactionalStore* const store){ + if(store) store->begin(); + for(op_iterator i = ops.begin(); i < ops.end(); i++){ + if(!(*i)->prepare()){ + if(store) store->abort(); + return false; + } + } + if(store) store->commit(); + return true; +} + +void TxBuffer::commit(){ + for(op_iterator i = ops.begin(); i < ops.end(); i++){ + (*i)->commit(); + } +} + +void TxBuffer::rollback(){ + for(op_iterator i = ops.begin(); i < ops.end(); i++){ + (*i)->rollback(); + } +} + +void TxBuffer::enlist(TxOp* const op){ + ops.push_back(op); +} diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h new file mode 100644 index 0000000000..0963c7472a --- /dev/null +++ b/cpp/src/qpid/broker/TxBuffer.h @@ -0,0 +1,102 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TxBuffer_ +#define _TxBuffer_ + +#include <vector> +#include "qpid/broker/TransactionalStore.h" +#include "qpid/broker/TxOp.h" + +/** + * Represents a single transaction. As such, an instance of this class + * will hold a list of operations representing the workload of the + * transaction. This work can be committed or rolled back. Committing + * is a two-stage process: first all the operations should be + * prepared, then if that succeeds they can be committed. + * + * In the 2pc case, a successful prepare may be followed by either a + * commit or a rollback. + * + * Atomicity of prepare is ensured by using a lower level + * transactional facility. This saves explicitly rolling back all the + * successfully prepared ops when one of them fails. i.e. we do not + * use 2pc internally, we instead ensure that prepare is atomic at a + * lower level. This makes individual prepare operations easier to + * code. + * + * Transactions on a messaging broker effect three types of 'action': + * (1) updates to persistent storage (2) updates to transient storage + * or cached data (3) network writes. + * + * Of these, (1) should always occur atomically during prepare to + * ensure that if the broker crashes while a transaction is being + * completed the persistent state (which is all that then remains) is + * consistent. (3) can only be done on commit, after a successful + * prepare. There is a little more flexibility with (2) but any + * changes made during prepare should be subject to the control of the + * TransactionalStore in use. + */ +namespace qpid { + namespace broker { + class TxBuffer{ + typedef std::vector<TxOp*>::iterator op_iterator; + std::vector<TxOp*> ops; + public: + /** + * Requests that all ops are prepared. This should + * primarily involve making sure that a persistent record + * of the operations is stored where necessary. + * + * All ops will be prepared under a transaction on the + * specified store. If any operation fails on prepare, + * this transaction will be rolled back. + * + * Once prepared, a transaction can be committed (or in + * the 2pc case, rolled back). + * + * @returns true if all the operations prepared + * successfully, false if not. + */ + bool prepare(TransactionalStore* const store); + /** + * Signals that the ops all prepared all completed + * successfully and can now commit, i.e. the operation can + * now be fully carried out. + * + * Should only be called after a call to prepare() returns + * true. + */ + void commit(); + /** + * Rolls back all the operations. + * + * Should only be called either after a call to prepare() + * returns true (2pc) or instead of a prepare call + * ('server-local') + */ + void rollback(); + /** + * Adds an operation to the transaction. + */ + void enlist(TxOp* const op); + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h new file mode 100644 index 0000000000..37934dbec6 --- /dev/null +++ b/cpp/src/qpid/broker/TxOp.h @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TxOp_ +#define _TxOp_ + +namespace qpid { + namespace broker { + class TxOp{ + public: + virtual bool prepare() throw() = 0; + virtual void commit() throw() = 0; + virtual void rollback() throw() = 0; + virtual ~TxOp(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp new file mode 100644 index 0000000000..93250dbb20 --- /dev/null +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/TxPublish.h" + +using namespace qpid::broker; + +TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {} + +bool TxPublish::prepare() throw(){ + try{ + for_each(queues.begin(), queues.end(), Prepare(msg, 0)); + return true; + }catch(...){ + std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl; + return false; + } +} + +void TxPublish::commit() throw(){ + for_each(queues.begin(), queues.end(), Commit(msg)); +} + +void TxPublish::rollback() throw(){ +} + +void TxPublish::deliverTo(Queue::shared_ptr& queue){ + queues.push_back(queue); +} + +TxPublish::Prepare::Prepare(Message::shared_ptr& _msg, const string* const _xid) : msg(_msg), xid(_xid){} + +void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ + queue->enqueue(msg, xid); +} + +TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){} + +void TxPublish::Commit::operator()(Queue::shared_ptr& queue){ + queue->process(msg); +} + diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h new file mode 100644 index 0000000000..01bb573fe2 --- /dev/null +++ b/cpp/src/qpid/broker/TxPublish.h @@ -0,0 +1,65 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TxPublish_ +#define _TxPublish_ + +#include <algorithm> +#include <functional> +#include <list> +#include "qpid/broker/Deliverable.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/MessageStore.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/TxOp.h" + +namespace qpid { + namespace broker { + class TxPublish : public TxOp, public Deliverable{ + class Prepare{ + Message::shared_ptr& msg; + const string* const xid; + public: + Prepare(Message::shared_ptr& msg, const string* const xid); + void operator()(Queue::shared_ptr& queue); + }; + + class Commit{ + Message::shared_ptr& msg; + public: + Commit(Message::shared_ptr& msg); + void operator()(Queue::shared_ptr& queue); + }; + + Message::shared_ptr msg; + std::list<Queue::shared_ptr> queues; + + public: + TxPublish(Message::shared_ptr msg); + virtual bool prepare() throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + + virtual void deliverTo(Queue::shared_ptr& queue); + + virtual ~TxPublish(){} + }; + } +} + + +#endif diff --git a/cpp/test/unit/qpid/broker/ChannelTest.cpp b/cpp/test/unit/qpid/broker/ChannelTest.cpp index b0907a40f3..5052d4127d 100644 --- a/cpp/test/unit/qpid/broker/ChannelTest.cpp +++ b/cpp/test/unit/qpid/broker/ChannelTest.cpp @@ -26,14 +26,6 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::concurrent; -struct DummyRouter{ - Message::shared_ptr last; - - void operator()(Message::shared_ptr& msg){ - last = msg; - } -}; - struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; @@ -46,31 +38,12 @@ struct DummyHandler : OutputHandler{ class ChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ChannelTest); - CPPUNIT_TEST(testIncoming); CPPUNIT_TEST(testConsumerMgmt); CPPUNIT_TEST(testDeliveryNoAck); CPPUNIT_TEST_SUITE_END(); public: - void testIncoming(){ - Channel channel(0, 0, 10000); - string routingKey("my_routing_key"); - channel.handlePublish(new Message(0, "test", routingKey, false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - string data1("abcdefg"); - string data2("hijklmn"); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - - CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last); - CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last); - DummyRouter router = channel.handleContent(part2, DummyRouter()); - CPPUNIT_ASSERT(router.last); - CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey()); - } - void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); Channel channel(0, 0, 0); diff --git a/cpp/test/unit/qpid/broker/ExchangeTest.cpp b/cpp/test/unit/qpid/broker/ExchangeTest.cpp index 40fa9cb032..2fb525312b 100644 --- a/cpp/test/unit/qpid/broker/ExchangeTest.cpp +++ b/cpp/test/unit/qpid/broker/ExchangeTest.cpp @@ -16,6 +16,7 @@ * */ +#include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Queue.h" @@ -50,7 +51,8 @@ class ExchangeTest : public CppUnit::TestCase queue.reset(); queue2.reset(); - Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A", true, true)); + Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true)); + DeliverableMessage msg(msgPtr); topic.route(msg, "abc", 0); direct.route(msg, "abc", 0); diff --git a/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp new file mode 100644 index 0000000000..c432de7785 --- /dev/null +++ b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp @@ -0,0 +1,110 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/Message.h" +#include "qpid/broker/MessageBuilder.h" +#include <qpid_test_plugin.h> +#include <iostream> +#include <memory> + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +class MessageBuilderTest : public CppUnit::TestCase +{ + struct DummyHandler : MessageBuilder::CompletionHandler{ + Message::shared_ptr msg; + + virtual void complete(Message::shared_ptr& _msg){ + msg = _msg; + } + }; + + + CPPUNIT_TEST_SUITE(MessageBuilderTest); + CPPUNIT_TEST(testHeaderOnly); + CPPUNIT_TEST(test1ContentFrame); + CPPUNIT_TEST(test2ContentFrames); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testHeaderOnly(){ + DummyHandler handler; + MessageBuilder builder(&handler); + + Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(0); + + builder.initialise(message); + CPPUNIT_ASSERT(!handler.msg); + builder.setHeader(header); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + } + + void test1ContentFrame(){ + DummyHandler handler; + MessageBuilder builder(&handler); + + string data1("abcdefg"); + + Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(7); + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + + builder.initialise(message); + CPPUNIT_ASSERT(!handler.msg); + builder.setHeader(header); + CPPUNIT_ASSERT(!handler.msg); + builder.addContent(part1); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + } + + void test2ContentFrames(){ + DummyHandler handler; + MessageBuilder builder(&handler); + + string data1("abcdefg"); + string data2("hijklmn"); + + Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + + builder.initialise(message); + CPPUNIT_ASSERT(!handler.msg); + builder.setHeader(header); + CPPUNIT_ASSERT(!handler.msg); + builder.addContent(part1); + CPPUNIT_ASSERT(!handler.msg); + builder.addContent(part2); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageBuilderTest); diff --git a/cpp/test/unit/qpid/broker/RouterTest.cpp b/cpp/test/unit/qpid/broker/RouterTest.cpp deleted file mode 100644 index f2c9f27abd..0000000000 --- a/cpp/test/unit/qpid/broker/RouterTest.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/broker/Channel.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/ExchangeRegistry.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Router.h" -#include <qpid_test_plugin.h> -#include <iostream> -#include <memory> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - -struct TestExchange : public Exchange{ - Message::shared_ptr msg; - string routingKey; - FieldTable* args; - - TestExchange() : Exchange("test"), args(0) {} - - void bind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){} - - void unbind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){ } - - void route(Message::shared_ptr& _msg, const string& _routingKey, FieldTable* _args){ - msg = _msg; - routingKey = _routingKey; - args = _args; - } -}; - -class RouterTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(RouterTest); - CPPUNIT_TEST(test); - CPPUNIT_TEST_SUITE_END(); - - public: - - void test() - { - ExchangeRegistry registry; - TestExchange* exchange = new TestExchange(); - registry.declare(exchange); - - string routingKey("my_routing_key"); - string name("name"); - string value("value"); - Message::shared_ptr msg(new Message(0, "test", routingKey, false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - - dynamic_cast<BasicHeaderProperties*>(header->getProperties())->getHeaders().setString(name, value); - msg->setHeader(header); - - Router router(registry); - router(msg); - - CPPUNIT_ASSERT(exchange->msg); - CPPUNIT_ASSERT_EQUAL(msg, exchange->msg); - CPPUNIT_ASSERT_EQUAL(routingKey, exchange->msg->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(routingKey, exchange->routingKey); - CPPUNIT_ASSERT_EQUAL(value, exchange->args->getString(name)); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(RouterTest); - diff --git a/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp b/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp new file mode 100644 index 0000000000..0b4fd94e10 --- /dev/null +++ b/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/APRBase.h" +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace qpid::concurrent; + +class APRBaseTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(APRBaseTest); + CPPUNIT_TEST(testMe); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testMe() + { + APRBase::increment(); + APRBase::increment(); + APRBase::decrement(); + APRBase::decrement(); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest); + |