summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-30 19:27:54 +0000
committerGordon Sim <gsim@apache.org>2006-10-30 19:27:54 +0000
commitb0a120b4edfdb49a08bd7c8c2479e7b1cadc5233 (patch)
treed2b4ca0e774100285e116e5442bff9e55b4a3f92
parentf491af49008a2ed219ad4507cd507b4317afa4cb (diff)
downloadqpid-python-b0a120b4edfdb49a08bd7c8c2479e7b1cadc5233.tar.gz
Initial implementation for tx class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469242 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/AccumulatedAck.cpp46
-rw-r--r--cpp/src/qpid/broker/AccumulatedAck.h52
-rw-r--r--cpp/src/qpid/broker/Channel.cpp175
-rw-r--r--cpp/src/qpid/broker/Channel.h143
-rw-r--r--cpp/src/qpid/broker/Configuration.cpp6
-rw-r--r--cpp/src/qpid/broker/DeletingTxOp.cpp42
-rw-r--r--cpp/src/qpid/broker/DeletingTxOp.h42
-rw-r--r--cpp/src/qpid/broker/Deliverable.h (renamed from cpp/src/qpid/broker/Router.h)17
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp (renamed from cpp/src/qpid/broker/Router.cpp)18
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h38
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp87
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h61
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp4
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h2
-rw-r--r--cpp/src/qpid/broker/Exchange.h6
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp4
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp4
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h2
-rw-r--r--cpp/src/qpid/broker/Message.cpp1
-rw-r--r--cpp/src/qpid/broker/Message.h10
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp53
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h51
-rw-r--r--cpp/src/qpid/broker/MessageStore.h71
-rw-r--r--cpp/src/qpid/broker/Prefetch.cpp26
-rw-r--r--cpp/src/qpid/broker/Prefetch.h39
-rw-r--r--cpp/src/qpid/broker/Queue.cpp26
-rw-r--r--cpp/src/qpid/broker/Queue.h20
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp5
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h4
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp40
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.h28
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp4
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h2
-rw-r--r--cpp/src/qpid/broker/TransactionalStore.h35
-rw-r--r--cpp/src/qpid/broker/TxAck.cpp46
-rw-r--r--cpp/src/qpid/broker/TxAck.h44
-rw-r--r--cpp/src/qpid/broker/TxBuffer.cpp48
-rw-r--r--cpp/src/qpid/broker/TxBuffer.h102
-rw-r--r--cpp/src/qpid/broker/TxOp.h34
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp56
-rw-r--r--cpp/src/qpid/broker/TxPublish.h65
-rw-r--r--cpp/test/unit/qpid/broker/ChannelTest.cpp27
-rw-r--r--cpp/test/unit/qpid/broker/ExchangeTest.cpp4
-rw-r--r--cpp/test/unit/qpid/broker/MessageBuilderTest.cpp110
-rw-r--r--cpp/test/unit/qpid/broker/RouterTest.cpp86
-rw-r--r--cpp/test/unit/qpid/concurrent/APRBaseTest.cpp44
47 files changed, 1448 insertions, 384 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/broker/AccumulatedAck.cpp
new file mode 100644
index 0000000000..86e1a5e786
--- /dev/null
+++ b/cpp/src/qpid/broker/AccumulatedAck.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/AccumulatedAck.h"
+
+using std::less_equal;
+using std::bind2nd;
+using namespace qpid::broker;
+
+void AccumulatedAck::update(u_int64_t tag, bool multiple){
+ if(multiple){
+ if(tag > range) range = tag;
+ //else don't care, it is already counted
+ }else if(tag < range){
+ individual.push_back(tag);
+ }
+}
+
+void AccumulatedAck::consolidate(){
+ individual.sort();
+ //remove any individual tags that are covered by range
+ individual.remove_if(bind2nd(less_equal<u_int64_t>(), range));
+}
+
+void AccumulatedAck::clear(){
+ range = 0;
+ individual.clear();
+}
+
+bool AccumulatedAck::covers(u_int64_t tag) const{
+ return tag < range || find(individual.begin(), individual.end(), tag) != individual.end();
+}
diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h
new file mode 100644
index 0000000000..54562b7af5
--- /dev/null
+++ b/cpp/src/qpid/broker/AccumulatedAck.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _AccumulatedAck_
+#define _AccumulatedAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Keeps an accumulated record of acked messages (by delivery
+ * tag).
+ */
+ struct AccumulatedAck{
+ /**
+ * If not zero, then everything up to this value has been
+ * acked.
+ */
+ u_int64_t range;
+ /**
+ * List of individually acked messages that are not
+ * included in the range marked by 'range'.
+ */
+ std::list<u_int64_t> individual;
+
+ void update(u_int64_t tag, bool multiple);
+ void consolidate();
+ void clear();
+ bool covers(u_int64_t tag) const;
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index 5497eda842..c40811e921 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -21,6 +21,8 @@
#include <sstream>
#include <assert.h>
+using std::mem_fun_ref;
+using std::bind2nd;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
@@ -29,14 +31,17 @@ using namespace qpid::concurrent;
Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
id(_id),
out(_out),
- deliveryTag(1),
+ currentDeliveryTag(1),
transactional(false),
prefetchSize(0),
prefetchCount(0),
- outstandingSize(0),
- outstandingCount(0),
framesize(_framesize),
- tagGenerator("sgen"){}
+ tagGenerator("sgen"),
+ store(0),
+ messageBuilder(this){
+
+ outstanding.reset();
+}
Channel::~Channel(){
}
@@ -86,30 +91,36 @@ void Channel::begin(){
}
void Channel::commit(){
-
+ TxAck txAck(accumulatedAck, unacked);
+ txBuffer.enlist(&txAck);
+ if(txBuffer.prepare(store)){
+ txBuffer.commit();
+ }
+ accumulatedAck.clear();
}
void Channel::rollback(){
-
+ txBuffer.rollback();
+ accumulatedAck.clear();
}
void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
+ u_int64_t deliveryTag = currentDeliveryTag++;
if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
- outstandingSize += msg->contentSize();
- outstandingCount++;
+ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+ outstanding.size += msg->contentSize();
+ outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
Locker locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
}
@@ -144,43 +155,66 @@ void Channel::ConsumerImpl::requestDispatch(){
if(blocked) queue->dispatch();
}
-void Channel::checkMessage(const std::string& text){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
- }
+void Channel::handlePublish(Message* _message, Exchange* _exchange){
+ Message::shared_ptr message(_message);
+ exchange = _exchange;
+ messageBuilder.initialise(message);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+ messageBuilder.setHeader(header);
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content){
+ messageBuilder.addContent(content);
}
-void Channel::handlePublish(Message* msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+void Channel::complete(Message::shared_ptr& msg){
+ if(exchange){
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ }
+ exchange = 0;
+ }else{
+ std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
}
- message = Message::shared_ptr(msg);
}
-void Channel::ack(u_int64_t _deliveryTag, bool multiple){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag));
- if(i == unacknowledged.end()){
- throw InvalidAckException();
- }else if(multiple){
- unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute prefetch outstanding (note: messages delivered through get are ignored)
- CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
- outstandingSize = calc.getSize();
- outstandingCount = calc.getCount();
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if(transactional){
+ accumulatedAck.update(deliveryTag, multiple);
+ //TODO: I think the outstanding prefetch size & count should be updated at this point...
+ //TODO: ...this may then necessitate dispatching to consumers
}else{
- if(!i->pull){
- outstandingSize -= i->msg->contentSize();
- outstandingCount--;
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ if(i == unacked.end()){
+ throw InvalidAckException();
+ }else if(multiple){
+ ack_iterator end = ++i;
+ for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+ unacked.erase(unacked.begin(), end);
+
+ //recalculate the prefetch:
+ outstanding.reset();
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
+ }else{
+ i->discard();
+ i->subtractFrom(&outstanding);
+ unacked.erase(i);
}
- unacknowledged.erase(i);
- }
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
}
}
@@ -188,14 +222,12 @@ void Channel::recover(bool requeue){
Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
if(requeue){
- outstandingSize = 0;
- outstandingCount = 0;
- ack_iterator start(unacknowledged.begin());
- ack_iterator end(unacknowledged.end());
- for_each(start, end, Requeue());
- unacknowledged.erase(start, end);
+ outstanding.reset();
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
- for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
}
}
@@ -203,10 +235,10 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
+ u_int64_t myDeliveryTag = currentDeliveryTag++;
msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
return true;
}else{
@@ -214,43 +246,6 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
}
}
-Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
-
-bool Channel::MatchAck::operator()(AckRecord& record) const{
- return tag == record.deliveryTag;
-}
-
-void Channel::Requeue::operator()(AckRecord& record) const{
- record.msg->redeliver();
- record.queue->deliver(record.msg);
-}
-
-Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
-
-void Channel::Redeliver::operator()(AckRecord& record) const{
- if(record.pull){
- //if message was originally sent as response to get, we must requeue it
- record.msg->redeliver();
- record.queue->deliver(record.msg);
- }else{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
- }
-}
-
-Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-
-void Channel::CalculatePrefetch::operator()(AckRecord& record){
- if(!record.pull){
- //ignore messages that were sent in response to get when calculating prefetch
- size += record.msg->contentSize();
- count++;
- }
-}
-
-u_int32_t Channel::CalculatePrefetch::getSize(){
- return size;
-}
-
-u_int16_t Channel::CalculatePrefetch::getCount(){
- return count;
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize);
}
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h
index e742f45279..ef6700ff80 100644
--- a/cpp/src/qpid/broker/Channel.h
+++ b/cpp/src/qpid/broker/Channel.h
@@ -19,17 +19,29 @@
#define _Channel_
#include <algorithm>
+#include <functional>
+#include <list>
#include <map>
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/BasicPublishBody.h"
+#include "qpid/broker/AccumulatedAck.h"
#include "qpid/broker/Binding.h"
#include "qpid/broker/Consumer.h"
+#include "qpid/broker/DeletingTxOp.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/NameGenerator.h"
-#include "qpid/framing/OutputHandler.h"
+#include "qpid/broker/Prefetch.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxAck.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/BasicPublishBody.h"
namespace qpid {
namespace broker {
@@ -37,8 +49,7 @@ namespace qpid {
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
- class Channel{
- private:
+ class Channel : private MessageBuilder::CompletionHandler{
class ConsumerImpl : public virtual Consumer{
Channel* parent;
string tag;
@@ -54,92 +65,29 @@ namespace qpid {
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
-
- struct AckRecord{
- Message::shared_ptr msg;
- Queue::shared_ptr queue;
- string consumerTag;
- u_int64_t deliveryTag;
- bool pull;
-
- AckRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const string _consumerTag,
- const u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- deliveryTag(_deliveryTag),
- pull(false){}
-
- AckRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(""),
- deliveryTag(_deliveryTag),
- pull(true){}
- };
-
- typedef std::vector<AckRecord>::iterator ack_iterator;
-
- class MatchAck{
- const u_int64_t tag;
- public:
- MatchAck(u_int64_t tag);
- bool operator()(AckRecord& record) const;
- };
-
- class Requeue{
- public:
- void operator()(AckRecord& record) const;
- };
-
- class Redeliver{
- Channel* const channel;
- public:
- Redeliver(Channel* const channel);
- void operator()(AckRecord& record) const;
- };
-
- class CalculatePrefetch{
- u_int32_t size;
- u_int16_t count;
- public:
- CalculatePrefetch();
- void operator()(AckRecord& record);
- u_int32_t getSize();
- u_int16_t getCount();
- };
-
const int id;
qpid::framing::OutputHandler* out;
- u_int64_t deliveryTag;
+ u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
std::map<string, ConsumerImpl*> consumers;
u_int32_t prefetchSize;
u_int16_t prefetchCount;
- u_int32_t outstandingSize;
- u_int16_t outstandingCount;
+ Prefetch outstanding;
u_int32_t framesize;
- Message::shared_ptr message;
NameGenerator tagGenerator;
- std::vector<AckRecord> unacknowledged;
+ std::list<DeliveryRecord> unacked;
qpid::concurrent::MonitorImpl deliveryLock;
+ TxBuffer txBuffer;
+ AccumulatedAck accumulatedAck;
+ TransactionalStore* store;
+ MessageBuilder messageBuilder;//builder for in-progress message
+ Exchange* exchange;//exchange to which any in-progress message was published to
+ virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void checkMessage(const std::string& text);
- bool checkPrefetch(Message::shared_ptr& msg);
void cancel(consumer_iterator consumer);
-
- template<class Operation> Operation processMessage(Operation route){
- if(message->isComplete()){
- route(message);
- message.reset();
- }
- return route;
- }
-
+ bool checkPrefetch(Message::shared_ptr& msg);
public:
Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
@@ -158,37 +106,10 @@ namespace qpid {
void rollback();
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
-
- /**
- * Handles the initial publish request though a
- * channel. The header and (if applicable) content will be
- * accumulated through calls to handleHeader() and
- * handleContent()
- */
- void handlePublish(Message* msg);
-
- /**
- * A template method that handles a received header and if
- * there is no content routes it using the functor passed
- * in.
- */
- template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
- checkMessage("Invalid message sequence: got header before publish.");
- message->setHeader(header);
- return processMessage(route);
- }
-
- /**
- * A template method that handles a received content and
- * if this completes the message, routes it using the
- * functor passed in.
- */
- template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
- checkMessage("Invalid message sequence: got content before publish.");
- message->addContent(content);
- return processMessage(route);
- }
-
+ void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
+ void handlePublish(Message* msg, Exchange* exchange);
+ void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
};
struct InvalidAckException{};
diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp
index 7aefe19b2b..2dcefd878d 100644
--- a/cpp/src/qpid/broker/Configuration.cpp
+++ b/cpp/src/qpid/broker/Configuration.cpp
@@ -191,6 +191,8 @@ bool Configuration::BoolOption::needsValue() const {
return false;
}
-void Configuration::BoolOption::setValue(const std::string& _value){
- value = strcasecmp(_value.c_str(), "true") == 0;
+void Configuration::BoolOption::setValue(const std::string& /*not required*/){
+ //BoolOptions have no value. The fact that the option is specified
+ //implies the value is true.
+ value = true;
}
diff --git a/cpp/src/qpid/broker/DeletingTxOp.cpp b/cpp/src/qpid/broker/DeletingTxOp.cpp
new file mode 100644
index 0000000000..e9b9f30326
--- /dev/null
+++ b/cpp/src/qpid/broker/DeletingTxOp.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/DeletingTxOp.h"
+
+using namespace qpid::broker;
+
+DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
+
+bool DeletingTxOp::prepare() throw(){
+ return delegate && delegate->prepare();
+}
+
+void DeletingTxOp::commit() throw(){
+ if(delegate){
+ delegate->commit();
+ delete delegate;
+ delegate = 0;
+ }
+}
+
+void DeletingTxOp::rollback() throw(){
+ if(delegate){
+ delegate->rollback();
+ delete delegate;
+ delegate = 0;
+ }
+}
diff --git a/cpp/src/qpid/broker/DeletingTxOp.h b/cpp/src/qpid/broker/DeletingTxOp.h
new file mode 100644
index 0000000000..7e43717f17
--- /dev/null
+++ b/cpp/src/qpid/broker/DeletingTxOp.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeletingTxOp_
+#define _DeletingTxOp_
+
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * TxOp wrapper that will delegate calls & delete the object
+ * to which it delegates after completion of the transaction.
+ */
+ class DeletingTxOp : public virtual TxOp{
+ TxOp* delegate;
+ public:
+ DeletingTxOp(TxOp* const delegate);
+ virtual bool prepare() throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~DeletingTxOp(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Router.h b/cpp/src/qpid/broker/Deliverable.h
index 3b4a3a0a4c..5aded061b7 100644
--- a/cpp/src/qpid/broker/Router.h
+++ b/cpp/src/qpid/broker/Deliverable.h
@@ -15,22 +15,17 @@
* limitations under the License.
*
*/
-#ifndef _Router_
-#define _Router_
+#ifndef _Deliverable_
+#define _Deliverable_
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
-/**
- * A routing functor
- */
namespace qpid {
namespace broker {
- class Router{
- ExchangeRegistry& registry;
+ class Deliverable{
public:
- Router(ExchangeRegistry& registry);
- void operator()(Message::shared_ptr& msg);
+ virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+ virtual ~Deliverable(){}
};
}
}
diff --git a/cpp/src/qpid/broker/Router.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index d5853473af..aff0012bf4 100644
--- a/cpp/src/qpid/broker/Router.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -15,18 +15,16 @@
* limitations under the License.
*
*/
-#include "qpid/broker/Router.h"
+#include "qpid/broker/DeliverableMessage.h"
using namespace qpid::broker;
-Router::Router(ExchangeRegistry& _registry) : registry(_registry){}
-
-void Router::operator()(Message::shared_ptr& msg){
- Exchange* exchange = registry.get(msg->getExchange());
- if(exchange){
- exchange->route(msg, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
- }else{
- std::cout << "WARNING: Could not route message, unknown exchange: " << msg->getExchange() << std::endl;
- }
+DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
+{
+}
+void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
+{
+ queue->deliver(msg);
}
+
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
new file mode 100644
index 0000000000..9c65cf7103
--- /dev/null
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeliverableMessage_
+#define _DeliverableMessage_
+
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class DeliverableMessage : public Deliverable{
+ Message::shared_ptr msg;
+ public:
+ DeliverableMessage(Message::shared_ptr& msg);
+ virtual void deliverTo(Queue::shared_ptr& queue);
+ virtual ~DeliverableMessage(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
new file mode 100644
index 0000000000..19b786a8d3
--- /dev/null
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/Channel.h"
+
+using namespace qpid::broker;
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const string _consumerTag,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag),
+ pull(false){}
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(""),
+ deliveryTag(_deliveryTag),
+ pull(true){}
+
+
+void DeliveryRecord::discard() const{
+ queue->dequeue(msg, 0);
+}
+
+bool DeliveryRecord::matches(u_int64_t tag) const{
+ return deliveryTag == tag;
+}
+
+bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+ return range->covers(deliveryTag);
+}
+
+void DeliveryRecord::discardIfCoveredBy(const AccumulatedAck* const range) const{
+ if(coveredBy(range)) discard();
+}
+
+void DeliveryRecord::redeliver(Channel* const channel) const{
+ if(pull){
+ //if message was originally sent as response to get, we must requeue it
+ requeue();
+ }else{
+ channel->deliver(msg, consumerTag, deliveryTag);
+ }
+}
+
+void DeliveryRecord::requeue() const{
+ msg->redeliver();
+ queue->deliver(msg);
+}
+
+void DeliveryRecord::addTo(Prefetch* const prefetch) const{
+ if(!pull){
+ //ignore 'pulled' messages (i.e. those that were sent in
+ //response to get) when calculating prefetch
+ prefetch->size += msg->contentSize();
+ prefetch->count++;
+ }
+}
+
+void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{
+ if(!pull){
+ //ignore 'pulled' messages (i.e. those that were sent in
+ //response to get) when calculating prefetch
+ prefetch->size -= msg->contentSize();
+ prefetch->count--;
+ }
+}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
new file mode 100644
index 0000000000..da74156000
--- /dev/null
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeliveryRecord_
+#define _DeliveryRecord_
+
+#include <algorithm>
+#include <list>
+#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Prefetch.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Channel;
+
+ /**
+ * Record of a delivery for which an ack is outstanding.
+ */
+ class DeliveryRecord{
+ mutable Message::shared_ptr msg;
+ mutable Queue::shared_ptr queue;
+ string consumerTag;
+ u_int64_t deliveryTag;
+ bool pull;
+
+ public:
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const string consumerTag, const u_int64_t deliveryTag);
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag);
+
+ void discard() const;
+ bool matches(u_int64_t tag) const;
+ bool coveredBy(const AccumulatedAck* const range) const;
+ void discardIfCoveredBy(const AccumulatedAck* const range) const;
+ void requeue() const;
+ void redeliver(Channel* const) const;
+ void addTo(Prefetch* const prefetch) const;
+ void subtractFrom(Prefetch* const prefetch) const;
+ };
+
+ typedef std::list<DeliveryRecord>::iterator ack_iterator;
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 63cfda8f51..46693f6f3c 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -51,12 +51,12 @@ void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, F
lock.release();
}
-void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
+void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
lock.acquire();
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
int count(0);
for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- (*i)->deliver(msg);
+ msg.deliverTo(*i);
}
if(!count){
std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index e998d3caa6..fbbad8109e 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -41,7 +41,7 @@ namespace broker {
virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
virtual ~DirectExchange();
};
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index f84f0d969e..dfa7559683 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -18,9 +18,9 @@
#ifndef _Exchange_
#define _Exchange_
-#include "qpid/framing/FieldTable.h"
-#include "qpid/broker/Message.h"
+#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Queue.h"
+#include "qpid/framing/FieldTable.h"
namespace qpid {
namespace broker {
@@ -32,7 +32,7 @@ namespace broker {
std::string getName() { return name; }
virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
};
}
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 0a184d5993..c519771132 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -44,10 +44,10 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
}
}
-void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){
Locker locker(lock);
for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- (*i)->deliver(msg);
+ msg.deliverTo(*i);
}
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index b3255e3b0f..209d964bf6 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -42,7 +42,7 @@ class FanOutExchange : public virtual Exchange {
virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
virtual ~FanOutExchange();
};
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 96365e2130..aece22a413 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -58,10 +58,10 @@ void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey
}
-void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){
Locker locker(lock);;
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) i->second->deliver(msg);
+ if (match(i->first, *args)) msg.deliverTo(i->second);
}
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 5c7525ee7c..f4261916d9 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -45,7 +45,7 @@ class HeadersExchange : public virtual Exchange {
virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args);
virtual ~HeadersExchange();
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index b5292ef043..962c74864e 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -17,7 +17,6 @@
*/
#include "qpid/concurrent/MonitorImpl.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/ExchangeRegistry.h"
#include <iostream>
using namespace boost;
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index b6f41e817a..cfe29bdfcf 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -19,16 +19,16 @@
#define _Message_
#include <boost/shared_ptr.hpp>
+#include "qpid/broker/ConnectionToken.h"
+#include "qpid/broker/TxBuffer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/BasicHeaderProperties.h"
#include "qpid/framing/BasicPublishBody.h"
-#include "qpid/broker/ConnectionToken.h"
#include "qpid/framing/OutputHandler.h"
namespace qpid {
namespace broker {
- class ExchangeRegistry;
/**
* Represents an AMQP message, i.e. a header body, a list of
@@ -48,6 +48,7 @@ namespace qpid {
qpid::framing::AMQHeaderBody::shared_ptr header;
content_list content;
u_int64_t size;
+ TxBuffer* tx;
void sendContent(qpid::framing::OutputHandler* out,
int channel, u_int32_t framesize);
@@ -79,8 +80,9 @@ namespace qpid {
qpid::framing::BasicHeaderProperties* getHeaderProperties();
const string& getRoutingKey() const { return routingKey; }
const string& getExchange() const { return exchange; }
- u_int64_t contentSize() const{ return size; }
-
+ u_int64_t contentSize() const { return size; }
+ TxBuffer* getTx() const { return tx; }
+ void setTx(TxBuffer* _tx) { tx = _tx; }
};
}
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
new file mode 100644
index 0000000000..c8488292b7
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/MessageBuilder.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+MessageBuilder::MessageBuilder(CompletionHandler* _handler) : handler(_handler) {}
+
+void MessageBuilder::route(){
+ if(message->isComplete()){
+ if(handler) handler->complete(message);
+ message.reset();
+ }
+}
+
+void MessageBuilder::initialise(Message::shared_ptr& msg){
+ if(message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+ }
+ message = msg;
+}
+
+void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
+ }
+ message->setHeader(header);
+ route();
+}
+
+void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
+ }
+ message->addContent(content);
+ route();
+}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
new file mode 100644
index 0000000000..a5966574ad
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _MessageBuilder_
+#define _MessageBuilder_
+
+#include "qpid/QpidError.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/BasicPublishBody.h"
+
+namespace qpid {
+ namespace broker {
+ class MessageBuilder{
+ public:
+ class CompletionHandler{
+ public:
+ virtual void complete(Message::shared_ptr&) = 0;
+ virtual ~CompletionHandler(){}
+ };
+ MessageBuilder(CompletionHandler* _handler);
+ void initialise(Message::shared_ptr& msg);
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
+ private:
+ Message::shared_ptr message;
+ CompletionHandler* handler;
+
+ void route();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
new file mode 100644
index 0000000000..af9dd20079
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _MessageStore_
+#define _MessageStore_
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TransactionalStore.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * An abstraction of the persistent storage for messages.
+ */
+ class MessageStore : public TransactionalStore{
+ public:
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of th queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being committed.
+ */
+ virtual void committed(const string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being aborted.
+ */
+ virtual void aborted(const string * const xid) = 0;
+
+ virtual ~MessageStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Prefetch.cpp b/cpp/src/qpid/broker/Prefetch.cpp
new file mode 100644
index 0000000000..6d9dbda13c
--- /dev/null
+++ b/cpp/src/qpid/broker/Prefetch.cpp
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/Prefetch.h"
+
+using namespace qpid::broker;
+
+void Prefetch::reset(){
+ size = 0;
+ count = 0;
+}
diff --git a/cpp/src/qpid/broker/Prefetch.h b/cpp/src/qpid/broker/Prefetch.h
new file mode 100644
index 0000000000..97abb4102d
--- /dev/null
+++ b/cpp/src/qpid/broker/Prefetch.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Prefetch_
+#define _Prefetch_
+
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Count and total size of asynchronously delivered
+ * (i.e. pushed) messages that have acks outstanding.
+ */
+ struct Prefetch{
+ u_int32_t size;
+ u_int16_t count;
+
+ void reset();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index ee9bff4513..14a89f7a66 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -16,16 +16,21 @@
*
*/
#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageStore.h"
#include "qpid/concurrent/MonitorImpl.h"
#include <iostream>
using namespace qpid::broker;
using namespace qpid::concurrent;
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) :
+Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete,
+ MessageStore* const _store,
+ const ConnectionToken* const _owner) :
+
name(_name),
autodelete(_autodelete),
durable(_durable),
+ store(_store),
owner(_owner),
queueing(false),
dispatching(false),
@@ -48,6 +53,11 @@ void Queue::bound(Binding* b){
}
void Queue::deliver(Message::shared_ptr& msg){
+ enqueue(msg, 0);
+ process(msg);
+}
+
+void Queue::process(Message::shared_ptr& msg){
Locker locker(lock);
if(queueing || !dispatch(msg)){
queueing = true;
@@ -153,3 +163,17 @@ bool Queue::canAutoDelete() const{
Locker locker(lock);
return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
}
+
+void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
+ bool persistent(false);//TODO: pull this from headers
+ if(persistent){
+ store->enqueue(msg, name, xid);
+ }
+}
+
+void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
+ bool persistent(false);//TODO: pull this from headers
+ if(persistent){
+ store->dequeue(msg, name, xid);
+ }
+}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 0f20400daa..93570f59cc 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -31,6 +31,7 @@
namespace qpid {
namespace broker {
+ class MessageStore;
/**
* Thrown when exclusive access would be violated.
@@ -47,6 +48,7 @@ namespace qpid {
const string name;
const u_int32_t autodelete;
const bool durable;
+ MessageStore* const store;
const ConnectionToken* const owner;
std::vector<Consumer*> consumers;
std::queue<Binding*> bindings;
@@ -67,7 +69,9 @@ namespace qpid {
typedef std::vector<shared_ptr> vector;
- Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+ Queue(const string& name, bool durable = false, u_int32_t autodelete = 0,
+ MessageStore* const store = 0,
+ const ConnectionToken* const owner = 0);
~Queue();
/**
* Informs the queue of a binding that should be cancelled on
@@ -75,13 +79,16 @@ namespace qpid {
*/
void bound(Binding* b);
/**
- * Delivers a message to the queue from where it will be
- * dispatched to immediately to a consumer if one is
- * available or stored for dequeue or later dispatch if
- * not.
+ * Delivers a message to the queue. Will record it as
+ * enqueued if persistent then process it.
*/
void deliver(Message::shared_ptr& msg);
/**
+ * Dispatches the messages immediately to a consumer if
+ * one is available or stores it for later if not.
+ */
+ void process(Message::shared_ptr& msg);
+ /**
* Dispatch any queued messages providing there are
* consumers for them. Only one thread can be dispatching
* at any time, but this method (rather than the caller)
@@ -98,6 +105,9 @@ namespace qpid {
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
bool canAutoDelete() const;
+
+ void enqueue(Message::shared_ptr& msg, const string * const xid);
+ void dequeue(Message::shared_ptr& msg, const string * const xid);
};
}
}
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 1f7684e608..973201fe64 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -29,14 +29,15 @@ QueueRegistry::QueueRegistry() : counter(1){}
QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
+QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete,
+ MessageStore* const store, const ConnectionToken* owner)
{
Locker locker(lock);
string name = declareName.empty() ? generateName() : declareName;
assert(!name.empty());
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
+ Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, owner));
queues[name] = queue;
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 42d75fc3e0..6f80291192 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -46,7 +46,9 @@ class QueueRegistry{
* @return The queue and a boolean flag which is true if the queue
* was created by this declare call false if it already existed.
*/
- std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0,
+ MessageStore* const _store = 0,
+ const ConnectionToken* const owner = 0);
/**
* Destroy the named queue.
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index 35f5b20854..a472cd27b0 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -19,7 +19,6 @@
#include "qpid/broker/SessionHandlerImpl.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/Router.h"
#include "qpid/broker/TopicExchange.h"
#include "assert.h"
@@ -40,11 +39,12 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
exchanges(_exchanges),
cleaner(_cleaner),
timeout(_timeout),
- connectionHandler(new ConnectionHandlerImpl(this)),
- channelHandler(new ChannelHandlerImpl(this)),
basicHandler(new BasicHandlerImpl(this)),
+ channelHandler(new ChannelHandlerImpl(this)),
+ connectionHandler(new ConnectionHandlerImpl(this)),
exchangeHandler(new ExchangeHandlerImpl(this)),
queueHandler(new QueueHandlerImpl(this)),
+ txHandler(new TxHandlerImpl(this)),
framemax(65536),
heartbeat(0) {}
@@ -146,11 +146,11 @@ void SessionHandlerImpl::closed(){
}
void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
- getChannel(channel)->handleHeader(body, Router(*exchanges));
+ getChannel(channel)->handleHeader(body);
}
void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
- getChannel(channel)->handleContent(body, Router(*exchanges));
+ getChannel(channel)->handleContent(body);
}
void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -261,7 +261,8 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
if (passive && !name.empty()) {
queue = parent->getQueue(name, channel);
} else {
- std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -367,11 +368,16 @@ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& con
}
void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
- string& exchange, string& routingKey,
+ string& exchangeName, string& routingKey,
bool mandatory, bool immediate){
- Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
- parent->getChannel(channel)->handlePublish(msg);
+ Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ if(exchange){
+ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
+ parent->getChannel(channel)->handlePublish(msg, exchange);
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ }
}
void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){
@@ -395,4 +401,20 @@ void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64
void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
parent->getChannel(channel)->recover(requeue);
}
+
+void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
+ parent->getChannel(channel)->begin();
+ parent->client.getTx().selectOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
+ parent->getChannel(channel)->commit();
+ parent->client.getTx().commitOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+ parent->getChannel(channel)->rollback();
+ parent->client.getTx().rollbackOk(channel);
+ parent->getChannel(channel)->recover(false);
+}
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.h b/cpp/src/qpid/broker/SessionHandlerImpl.h
index afaae74d97..6b9b5cca6b 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.h
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.h
@@ -71,11 +71,12 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
AutoDelete* const cleaner;
const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
- std::auto_ptr<ConnectionHandler> connectionHandler;
- std::auto_ptr<ChannelHandler> channelHandler;
std::auto_ptr<BasicHandler> basicHandler;
+ std::auto_ptr<ChannelHandler> channelHandler;
+ std::auto_ptr<ConnectionHandler> connectionHandler;
std::auto_ptr<ExchangeHandler> exchangeHandler;
std::auto_ptr<QueueHandler> queueHandler;
+ std::auto_ptr<TxHandler> txHandler;
std::map<u_int16_t, Channel*> channels;
std::vector<Queue::shared_ptr> exclusiveQueues;
@@ -212,18 +213,29 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
virtual ~BasicHandlerImpl(){}
};
+ class TxHandlerImpl : public virtual TxHandler{
+ SessionHandlerImpl* parent;
+ public:
+ TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ virtual ~TxHandlerImpl() {}
+ virtual void select(u_int16_t channel);
+ virtual void commit(u_int16_t channel);
+ virtual void rollback(u_int16_t channel);
+ };
+
+
inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
+ inline virtual TxHandler* getTxHandler(){ return txHandler.get(); }
- inline virtual AccessHandler* getAccessHandler(){ return 0; }
- inline virtual FileHandler* getFileHandler(){ return 0; }
- inline virtual StreamHandler* getStreamHandler(){ return 0; }
- inline virtual TxHandler* getTxHandler(){ return 0; }
- inline virtual DtxHandler* getDtxHandler(){ return 0; }
- inline virtual TunnelHandler* getTunnelHandler(){ return 0; }
+ inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }
+ inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }
+ inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }
+ inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }
+ inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
};
}
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 9ab779777c..dc252d208f 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -135,13 +135,13 @@ void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, Fi
}
-void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
lock.acquire();
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(routingKey)) {
Queue::vector& qv(i->second);
for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- (*j)->deliver(msg);
+ msg.deliverTo(*j);
}
}
}
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index d6c1946e71..9f08153a2e 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -82,7 +82,7 @@ class TopicExchange : public virtual Exchange{
virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args);
virtual ~TopicExchange();
};
diff --git a/cpp/src/qpid/broker/TransactionalStore.h b/cpp/src/qpid/broker/TransactionalStore.h
new file mode 100644
index 0000000000..3976edd7b9
--- /dev/null
+++ b/cpp/src/qpid/broker/TransactionalStore.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TransactionalStore_
+#define _TransactionalStore_
+
+namespace qpid {
+ namespace broker {
+ class TransactionalStore{
+ public:
+ virtual void begin() = 0;
+ virtual void commit() = 0;
+ virtual void abort() = 0;
+
+ virtual ~TransactionalStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp
new file mode 100644
index 0000000000..7e787a463e
--- /dev/null
+++ b/cpp/src/qpid/broker/TxAck.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxAck.h"
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+
+TxAck::TxAck(AccumulatedAck _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){
+
+}
+
+bool TxAck::prepare() throw(){
+ try{
+ //dequeue all acked messages from their queues
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
+ return true;
+ }catch(...){
+ std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
+ return false;
+ }
+}
+
+void TxAck::commit() throw(){
+ //remove all acked records from the list
+ unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
+}
+
+void TxAck::rollback() throw(){
+}
diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h
new file mode 100644
index 0000000000..645bf1b1b0
--- /dev/null
+++ b/cpp/src/qpid/broker/TxAck.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxAck_
+#define _TxAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ class TxAck : public TxOp{
+ AccumulatedAck acked;
+ std::list<DeliveryRecord>& unacked;
+ public:
+ TxAck(AccumulatedAck acked, std::list<DeliveryRecord>& unacked);
+ virtual bool prepare() throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~TxAck(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp
new file mode 100644
index 0000000000..0529892930
--- /dev/null
+++ b/cpp/src/qpid/broker/TxBuffer.cpp
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxBuffer.h"
+
+using namespace qpid::broker;
+
+bool TxBuffer::prepare(TransactionalStore* const store){
+ if(store) store->begin();
+ for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ if(!(*i)->prepare()){
+ if(store) store->abort();
+ return false;
+ }
+ }
+ if(store) store->commit();
+ return true;
+}
+
+void TxBuffer::commit(){
+ for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ (*i)->commit();
+ }
+}
+
+void TxBuffer::rollback(){
+ for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ (*i)->rollback();
+ }
+}
+
+void TxBuffer::enlist(TxOp* const op){
+ ops.push_back(op);
+}
diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h
new file mode 100644
index 0000000000..0963c7472a
--- /dev/null
+++ b/cpp/src/qpid/broker/TxBuffer.h
@@ -0,0 +1,102 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxBuffer_
+#define _TxBuffer_
+
+#include <vector>
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxOp.h"
+
+/**
+ * Represents a single transaction. As such, an instance of this class
+ * will hold a list of operations representing the workload of the
+ * transaction. This work can be committed or rolled back. Committing
+ * is a two-stage process: first all the operations should be
+ * prepared, then if that succeeds they can be committed.
+ *
+ * In the 2pc case, a successful prepare may be followed by either a
+ * commit or a rollback.
+ *
+ * Atomicity of prepare is ensured by using a lower level
+ * transactional facility. This saves explicitly rolling back all the
+ * successfully prepared ops when one of them fails. i.e. we do not
+ * use 2pc internally, we instead ensure that prepare is atomic at a
+ * lower level. This makes individual prepare operations easier to
+ * code.
+ *
+ * Transactions on a messaging broker effect three types of 'action':
+ * (1) updates to persistent storage (2) updates to transient storage
+ * or cached data (3) network writes.
+ *
+ * Of these, (1) should always occur atomically during prepare to
+ * ensure that if the broker crashes while a transaction is being
+ * completed the persistent state (which is all that then remains) is
+ * consistent. (3) can only be done on commit, after a successful
+ * prepare. There is a little more flexibility with (2) but any
+ * changes made during prepare should be subject to the control of the
+ * TransactionalStore in use.
+ */
+namespace qpid {
+ namespace broker {
+ class TxBuffer{
+ typedef std::vector<TxOp*>::iterator op_iterator;
+ std::vector<TxOp*> ops;
+ public:
+ /**
+ * Requests that all ops are prepared. This should
+ * primarily involve making sure that a persistent record
+ * of the operations is stored where necessary.
+ *
+ * All ops will be prepared under a transaction on the
+ * specified store. If any operation fails on prepare,
+ * this transaction will be rolled back.
+ *
+ * Once prepared, a transaction can be committed (or in
+ * the 2pc case, rolled back).
+ *
+ * @returns true if all the operations prepared
+ * successfully, false if not.
+ */
+ bool prepare(TransactionalStore* const store);
+ /**
+ * Signals that the ops all prepared all completed
+ * successfully and can now commit, i.e. the operation can
+ * now be fully carried out.
+ *
+ * Should only be called after a call to prepare() returns
+ * true.
+ */
+ void commit();
+ /**
+ * Rolls back all the operations.
+ *
+ * Should only be called either after a call to prepare()
+ * returns true (2pc) or instead of a prepare call
+ * ('server-local')
+ */
+ void rollback();
+ /**
+ * Adds an operation to the transaction.
+ */
+ void enlist(TxOp* const op);
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h
new file mode 100644
index 0000000000..37934dbec6
--- /dev/null
+++ b/cpp/src/qpid/broker/TxOp.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxOp_
+#define _TxOp_
+
+namespace qpid {
+ namespace broker {
+ class TxOp{
+ public:
+ virtual bool prepare() throw() = 0;
+ virtual void commit() throw() = 0;
+ virtual void rollback() throw() = 0;
+ virtual ~TxOp(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
new file mode 100644
index 0000000000..93250dbb20
--- /dev/null
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxPublish.h"
+
+using namespace qpid::broker;
+
+TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
+
+bool TxPublish::prepare() throw(){
+ try{
+ for_each(queues.begin(), queues.end(), Prepare(msg, 0));
+ return true;
+ }catch(...){
+ std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
+ return false;
+ }
+}
+
+void TxPublish::commit() throw(){
+ for_each(queues.begin(), queues.end(), Commit(msg));
+}
+
+void TxPublish::rollback() throw(){
+}
+
+void TxPublish::deliverTo(Queue::shared_ptr& queue){
+ queues.push_back(queue);
+}
+
+TxPublish::Prepare::Prepare(Message::shared_ptr& _msg, const string* const _xid) : msg(_msg), xid(_xid){}
+
+void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
+ queue->enqueue(msg, xid);
+}
+
+TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
+
+void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
+ queue->process(msg);
+}
+
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
new file mode 100644
index 0000000000..01bb573fe2
--- /dev/null
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxPublish_
+#define _TxPublish_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ class TxPublish : public TxOp, public Deliverable{
+ class Prepare{
+ Message::shared_ptr& msg;
+ const string* const xid;
+ public:
+ Prepare(Message::shared_ptr& msg, const string* const xid);
+ void operator()(Queue::shared_ptr& queue);
+ };
+
+ class Commit{
+ Message::shared_ptr& msg;
+ public:
+ Commit(Message::shared_ptr& msg);
+ void operator()(Queue::shared_ptr& queue);
+ };
+
+ Message::shared_ptr msg;
+ std::list<Queue::shared_ptr> queues;
+
+ public:
+ TxPublish(Message::shared_ptr msg);
+ virtual bool prepare() throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+
+ virtual void deliverTo(Queue::shared_ptr& queue);
+
+ virtual ~TxPublish(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/test/unit/qpid/broker/ChannelTest.cpp b/cpp/test/unit/qpid/broker/ChannelTest.cpp
index b0907a40f3..5052d4127d 100644
--- a/cpp/test/unit/qpid/broker/ChannelTest.cpp
+++ b/cpp/test/unit/qpid/broker/ChannelTest.cpp
@@ -26,14 +26,6 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
-struct DummyRouter{
- Message::shared_ptr last;
-
- void operator()(Message::shared_ptr& msg){
- last = msg;
- }
-};
-
struct DummyHandler : OutputHandler{
std::vector<AMQFrame*> frames;
@@ -46,31 +38,12 @@ struct DummyHandler : OutputHandler{
class ChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ChannelTest);
- CPPUNIT_TEST(testIncoming);
CPPUNIT_TEST(testConsumerMgmt);
CPPUNIT_TEST(testDeliveryNoAck);
CPPUNIT_TEST_SUITE_END();
public:
- void testIncoming(){
- Channel channel(0, 0, 10000);
- string routingKey("my_routing_key");
- channel.handlePublish(new Message(0, "test", routingKey, false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- string data1("abcdefg");
- string data2("hijklmn");
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
-
- CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last);
- CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last);
- DummyRouter router = channel.handleContent(part2, DummyRouter());
- CPPUNIT_ASSERT(router.last);
- CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey());
- }
-
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
Channel channel(0, 0, 0);
diff --git a/cpp/test/unit/qpid/broker/ExchangeTest.cpp b/cpp/test/unit/qpid/broker/ExchangeTest.cpp
index 40fa9cb032..2fb525312b 100644
--- a/cpp/test/unit/qpid/broker/ExchangeTest.cpp
+++ b/cpp/test/unit/qpid/broker/ExchangeTest.cpp
@@ -16,6 +16,7 @@
*
*/
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
@@ -50,7 +51,8 @@ class ExchangeTest : public CppUnit::TestCase
queue.reset();
queue2.reset();
- Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true));
+ DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
new file mode 100644
index 0000000000..c432de7785
--- /dev/null
+++ b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
@@ -0,0 +1,110 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageBuilder.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <memory>
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+class MessageBuilderTest : public CppUnit::TestCase
+{
+ struct DummyHandler : MessageBuilder::CompletionHandler{
+ Message::shared_ptr msg;
+
+ virtual void complete(Message::shared_ptr& _msg){
+ msg = _msg;
+ }
+ };
+
+
+ CPPUNIT_TEST_SUITE(MessageBuilderTest);
+ CPPUNIT_TEST(testHeaderOnly);
+ CPPUNIT_TEST(test1ContentFrame);
+ CPPUNIT_TEST(test2ContentFrames);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testHeaderOnly(){
+ DummyHandler handler;
+ MessageBuilder builder(&handler);
+
+ Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(0);
+
+ builder.initialise(message);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.setHeader(header);
+ CPPUNIT_ASSERT(handler.msg);
+ CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ }
+
+ void test1ContentFrame(){
+ DummyHandler handler;
+ MessageBuilder builder(&handler);
+
+ string data1("abcdefg");
+
+ Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(7);
+ AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+
+ builder.initialise(message);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.setHeader(header);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.addContent(part1);
+ CPPUNIT_ASSERT(handler.msg);
+ CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ }
+
+ void test2ContentFrames(){
+ DummyHandler handler;
+ MessageBuilder builder(&handler);
+
+ string data1("abcdefg");
+ string data2("hijklmn");
+
+ Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+ AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+
+ builder.initialise(message);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.setHeader(header);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.addContent(part1);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.addContent(part2);
+ CPPUNIT_ASSERT(handler.msg);
+ CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageBuilderTest);
diff --git a/cpp/test/unit/qpid/broker/RouterTest.cpp b/cpp/test/unit/qpid/broker/RouterTest.cpp
deleted file mode 100644
index f2c9f27abd..0000000000
--- a/cpp/test/unit/qpid/broker/RouterTest.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/broker/Channel.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Router.h"
-#include <qpid_test_plugin.h>
-#include <iostream>
-#include <memory>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-struct TestExchange : public Exchange{
- Message::shared_ptr msg;
- string routingKey;
- FieldTable* args;
-
- TestExchange() : Exchange("test"), args(0) {}
-
- void bind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){}
-
- void unbind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){ }
-
- void route(Message::shared_ptr& _msg, const string& _routingKey, FieldTable* _args){
- msg = _msg;
- routingKey = _routingKey;
- args = _args;
- }
-};
-
-class RouterTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(RouterTest);
- CPPUNIT_TEST(test);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- void test()
- {
- ExchangeRegistry registry;
- TestExchange* exchange = new TestExchange();
- registry.declare(exchange);
-
- string routingKey("my_routing_key");
- string name("name");
- string value("value");
- Message::shared_ptr msg(new Message(0, "test", routingKey, false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
-
- dynamic_cast<BasicHeaderProperties*>(header->getProperties())->getHeaders().setString(name, value);
- msg->setHeader(header);
-
- Router router(registry);
- router(msg);
-
- CPPUNIT_ASSERT(exchange->msg);
- CPPUNIT_ASSERT_EQUAL(msg, exchange->msg);
- CPPUNIT_ASSERT_EQUAL(routingKey, exchange->msg->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(routingKey, exchange->routingKey);
- CPPUNIT_ASSERT_EQUAL(value, exchange->args->getString(name));
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(RouterTest);
-
diff --git a/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp b/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp
new file mode 100644
index 0000000000..0b4fd94e10
--- /dev/null
+++ b/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+
+using namespace qpid::concurrent;
+
+class APRBaseTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(APRBaseTest);
+ CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testMe()
+ {
+ APRBase::increment();
+ APRBase::increment();
+ APRBase::decrement();
+ APRBase::decrement();
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest);
+