diff options
author | Gordon Sim <gsim@apache.org> | 2007-04-19 17:56:21 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-04-19 17:56:21 +0000 |
commit | b1ad015fe2670bc3e5471c5e350e243cca948dcf (patch) | |
tree | cbbae911b59a34f7cbe998609ca9d14f8984ca37 /cpp/src | |
parent | e7cc3594288f5a6ed6c6565e34413823f5b8e2d8 (diff) | |
download | qpid-python-b1ad015fe2670bc3e5471c5e350e243cca948dcf.tar.gz |
Some dtx related updates.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@530500 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 66 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.cpp | 40 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.h | 44 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.cpp | 124 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.h | 98 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 59 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.h | 53 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.cpp | 107 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.h | 59 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.h | 32 | ||||
-rw-r--r-- | cpp/src/tests/TxBufferTest.cpp | 57 |
19 files changed, 731 insertions, 70 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 6d71903271..e375682b27 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -112,6 +112,10 @@ libqpidbroker_la_SOURCES = \ qpid/broker/DeliverableMessage.cpp \ qpid/broker/DeliveryRecord.cpp \ qpid/broker/DirectExchange.cpp \ + qpid/broker/DtxBuffer.cpp \ + qpid/broker/DtxHandlerImpl.cpp \ + qpid/broker/DtxManager.cpp \ + qpid/broker/DtxWorkRecord.cpp \ qpid/broker/ExchangeRegistry.cpp \ qpid/broker/FanOutExchange.cpp \ qpid/broker/HeadersExchange.cpp \ @@ -161,6 +165,10 @@ nobase_include_HEADERS = \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.h \ qpid/broker/DirectExchange.h \ + qpid/broker/DtxBuffer.h \ + qpid/broker/DtxHandlerImpl.h \ + qpid/broker/DtxManager.h \ + qpid/broker/DtxWorkRecord.h \ qpid/broker/ExchangeRegistry.h \ qpid/broker/FanOutExchange.h \ qpid/broker/HandlerImpl.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 5ab65bf573..cda3745522 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -53,7 +53,8 @@ Broker::Broker(const Configuration& conf) : timeout(30000), stagingThreshold(0), cleaner(&queues, timeout/10), - factory(*this) + factory(*this), + dtxManager(store.get()) { exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 57bd86f72e..e732664acb 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -32,6 +32,7 @@ #include "ExchangeRegistry.h" #include "ConnectionToken.h" #include "DirectExchange.h" +#include "DtxManager.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "QueueRegistry.h" @@ -83,6 +84,7 @@ class Broker : public sys::Runnable, uint32_t getTimeout() { return timeout; } uint64_t getStagingThreshold() { return stagingThreshold; } AutoDelete& getCleaner() { return cleaner; } + DtxManager& getDtxManager() { return dtxManager; } private: Broker(const Configuration& config); @@ -97,6 +99,7 @@ class Broker : public sys::Runnable, uint64_t stagingThreshold; AutoDelete cleaner; ConnectionFactory factory; + DtxManager dtxManager; static MessageStore* createStore(const Configuration& config); }; diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 240973b060..36232339e5 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -42,7 +42,8 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : exchangeHandler(*this), messageHandler(*this), queueHandler(*this), - txHandler(*this) + txHandler(*this), + dtxHandler(*this) {} @@ -336,7 +337,7 @@ void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue } void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ - channel.begin(); + channel.startTx(); client.selectOk(context.getRequestId()); } diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 0c7c9edba8..6b54575776 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -21,6 +21,7 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "HandlerImpl.h" #include "MessageHandlerImpl.h" +#include "DtxHandlerImpl.h" #include "qpid/Exception.h" namespace qpid { @@ -76,6 +77,9 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations TunnelHandler* getTunnelHandler() { throw ConnectionException(540, "Tunnel class not implemented"); } + DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } + DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } + framing::AMQP_ClientProxy& getProxy() { return proxy; } private: @@ -213,7 +217,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations MessageHandlerImpl messageHandler; QueueHandlerImpl queueHandler; TxHandlerImpl txHandler; - + DtxHandlerImpl dtxHandler; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index afbbed5c29..e4ff098c8e 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -54,7 +54,6 @@ Channel::Channel( ChannelAdapter(id, &con.getOutput(), con.getVersion()), connection(con), currentDeliveryTag(1), - transactional(false), prefetchSize(0), prefetchCount(0), framesize(_framesize), @@ -104,24 +103,38 @@ void Channel::close(){ recover(true); } -void Channel::begin(){ - transactional = true; +void Channel::startTx(){ + txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } void Channel::commit(){ TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); - txBuffer.enlist(txAck); - if(txBuffer.prepare(store)){ - txBuffer.commit(); + txBuffer->enlist(txAck); + if (txBuffer->commitLocal(store)) { + accumulatedAck.clear(); } - accumulatedAck.clear(); } void Channel::rollback(){ - txBuffer.rollback(); + txBuffer->rollback(); accumulatedAck.clear(); } +void Channel::startDtx(const std::string& xid, DtxManager& mgr){ + dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer()); + txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); + mgr.start(xid, dtxBuffer); +} + +void Channel::endDtx(){ + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); + dtxBuffer->enlist(txAck); + dtxBuffer->markEnded(); + + dtxBuffer.reset(); + txBuffer.reset(); +} + void Channel::deliver( Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected) @@ -180,23 +193,8 @@ void Channel::ConsumerImpl::requestDispatch(){ queue->dispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr msg) -{ - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - if(transactional){ - TxPublish* deliverable(new TxPublish(msg)); - TxOp::shared_ptr op(deliverable); - exchange->route( - *deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - txBuffer.enlist(op); - }else{ - DeliverableMessage deliverable(msg); - exchange->route( - deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - } +void Channel::handleInlineTransfer(Message::shared_ptr msg){ + complete(msg); } void Channel::handlePublish(Message* _message){ @@ -222,12 +220,12 @@ void Channel::complete(Message::shared_ptr msg) { Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); assert(exchange.get()); - if(transactional) { + if (txBuffer) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); - txBuffer.enlist(op); + txBuffer->enlist(op); } else { DeliverableMessage deliverable(msg); exchange->route(deliverable, msg->getRoutingKey(), @@ -236,24 +234,24 @@ void Channel::complete(Message::shared_ptr msg) { } void Channel::ack(){ - ack(getFirstAckRequest(), getLastAckRequest()); + ack(getFirstAckRequest(), getLastAckRequest()); } // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ - if (multiple) - ack(0, deliveryTag); - else - ack(deliveryTag, deliveryTag); + if (multiple) + ack(0, deliveryTag); + else + ack(deliveryTag, deliveryTag); } void Channel::ack(uint64_t firstTag, uint64_t lastTag){ - if(transactional){ + if (txBuffer) { accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers - }else{ + } else { Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 9515485d37..4749ef6b5a 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -31,6 +31,8 @@ #include "AccumulatedAck.h" #include "Consumer.h" #include "DeliveryRecord.h" +#include "DtxBuffer.h" +#include "DtxManager.h" #include "MessageBuilder.h" #include "NameGenerator.h" #include "Prefetch.h" @@ -80,7 +82,6 @@ class Channel : public framing::ChannelAdapter, Connection& connection; uint64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; - bool transactional; ConsumerImplMap consumers; uint32_t prefetchSize; uint16_t prefetchCount; @@ -89,7 +90,8 @@ class Channel : public framing::ChannelAdapter, NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; sys::Mutex deliveryLock; - TxBuffer txBuffer; + TxBuffer::shared_ptr txBuffer; + DtxBuffer::shared_ptr dtxBuffer; AccumulatedAck accumulatedAck; MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message @@ -113,7 +115,7 @@ class Channel : public framing::ChannelAdapter, ~Channel(); bool isOpen() const { return opened; } - BrokerAdapter& getAdatper() { return *adapter; } + BrokerAdapter& getAdapter() { return *adapter; } void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } @@ -131,10 +133,12 @@ class Channel : public framing::ChannelAdapter, const framing::FieldTable* = 0); void cancel(const string& tag); bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected); - void begin(); void close(); + void startTx(); void commit(); void rollback(); + void startDtx(const std::string& xid, DtxManager& mgr); + void endDtx(); void ack(); void ack(uint64_t deliveryTag, bool multiple); void ack(uint64_t deliveryTag, uint64_t endTag); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index b2174de44f..88015ce310 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -82,7 +82,7 @@ void Connection::initiated(const framing::ProtocolInitiation& header) { string mechanisms("PLAIN"); string locales("en_US"); getChannel(0).init(0, *out, getVersion()); - client = &getChannel(0).getAdatper().getProxy().getConnection(); + client = &getChannel(0).getAdapter().getProxy().getConnection(); client->start( header.getMajor(), header.getMinor(), properties, mechanisms, locales); diff --git a/cpp/src/qpid/broker/DtxBuffer.cpp b/cpp/src/qpid/broker/DtxBuffer.cpp new file mode 100644 index 0000000000..bdc326593a --- /dev/null +++ b/cpp/src/qpid/broker/DtxBuffer.cpp @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DtxBuffer.h" + +using namespace qpid::broker; +using qpid::sys::Mutex; + +DtxBuffer::DtxBuffer() : ended(false) {} + +DtxBuffer::~DtxBuffer() {} + +void DtxBuffer::markEnded() +{ + Mutex::ScopedLock locker(lock); + ended = true; +} + +bool DtxBuffer::isEnded() +{ + Mutex::ScopedLock locker(lock); + return ended; +} diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h new file mode 100644 index 0000000000..15970ccff0 --- /dev/null +++ b/cpp/src/qpid/broker/DtxBuffer.h @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DtxBuffer_ +#define _DtxBuffer_ + +#include "TxBuffer.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { + namespace broker { + class DtxBuffer : public TxBuffer{ + sys::Mutex lock; + bool ended; + public: + typedef boost::shared_ptr<DtxBuffer> shared_ptr; + + DtxBuffer(); + ~DtxBuffer(); + void markEnded(); + bool isEnded(); + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp new file mode 100644 index 0000000000..06b69bc20a --- /dev/null +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -0,0 +1,124 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "DtxHandlerImpl.h" + +#include "Broker.h" +#include "BrokerChannel.h" + +using namespace qpid::broker; +using qpid::framing::FieldTable; +using qpid::framing::MethodContext; +using std::string; + +DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {} + + +// DtxDemarcationHandler: + +void DtxHandlerImpl::end(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& /*xid*/, + bool /*fail*/, + bool /*suspend*/ ) +{ + channel.endDtx(); + //send end-ok + //TODO: handle fail and suspend + //TODO: check xid is as expected? +} + + +void DtxHandlerImpl::select(const MethodContext& /*context*/ ) +{ + //don't need to do anything here really + //send select-ok +} + + +void DtxHandlerImpl::start(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& xid, + bool /*join*/, + bool /*resume*/ ) +{ + channel.startDtx(xid, broker.getDtxManager()); + //send start-ok + //TODO: handle join and resume +} + +// DtxCoordinationHandler: + +void DtxHandlerImpl::prepare(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& xid ) +{ + broker.getDtxManager().prepare(xid); + //send prepare-ok +} + +void DtxHandlerImpl::commit(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& xid, + bool /*onePhase*/ ) +{ + broker.getDtxManager().commit(xid); + //send commit-ok + //TODO use onePhase flag to validate correct sequence +} + + +void DtxHandlerImpl::rollback(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& xid ) +{ + broker.getDtxManager().rollback(xid); + //send rollback-ok +} + +void DtxHandlerImpl::recover(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + bool /*startscan*/, + u_int32_t /*endscan*/ ) +{ + //TODO +} + +void DtxHandlerImpl::forget(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& /*xid*/ ) +{ + //TODO +} + +void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/, + const string& /*xid*/ ) +{ + //TODO +} + + +void DtxHandlerImpl::setTimeout(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& /*xid*/, + u_int32_t /*timeout*/ ) +{ + //TODO +} + + + diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h new file mode 100644 index 0000000000..368ffddbab --- /dev/null +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -0,0 +1,98 @@ +#ifndef _broker_DtxHandlerImpl_h +#define _broker_DtxHandlerImpl_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/AMQP_ClientProxy.h" +#include "HandlerImpl.h" + +namespace qpid { +namespace broker { + +/* dummy interfaces until real ones are generated from published spec */ +class DtxCoordinationHandler{}; +class DtxDemarcationHandler{}; + +class DtxHandlerImpl + : public CoreRefs, + public /*framing::AMQP_ServerOperations::*/DtxCoordinationHandler, + public /*framing::AMQP_ServerOperations::*/DtxDemarcationHandler +{ +public: + DtxHandlerImpl(CoreRefs& parent); + + // DtxCoordinationHandler: + + void commit(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& xid, + bool onePhase ); + + void forget(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& xid ); + + void getTimeout(const framing::MethodContext& context, + const std::string& xid ); + + void prepare(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& xid ); + + void recover(const framing::MethodContext& context, + u_int16_t ticket, + bool startscan, + u_int32_t endscan ); + + void recoverOk(const framing::MethodContext& context, + const framing::FieldTable& xids ); + + void rollback(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& xid ); + + void setTimeout(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& xid, + u_int32_t timeout ); + + // DtxDemarcationHandler: + + void end(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& xid, + bool fail, + bool suspend ); + + void select(const framing::MethodContext& context ); + + void start(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& xid, + bool join, + bool resume ); +}; + + +}} // namespace qpid::broker + + + +#endif /*!_broker_DtxHandlerImpl_h*/ diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp new file mode 100644 index 0000000000..6c074bbd51 --- /dev/null +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DtxManager.h" +#include <boost/format.hpp> + +using namespace qpid::broker; + +DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {} + +DtxManager::~DtxManager() {} + +void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops) +{ + WorkMap::iterator i = work.find(xid); + if (i == work.end()) { + i = work.insert(xid, new DtxWorkRecord(xid, store)).first; + } + i->add(ops); +} + +void DtxManager::prepare(const std::string& xid) +{ + getWork(xid)->prepare(); +} + +void DtxManager::commit(const std::string& xid) +{ + getWork(xid)->commit(); +} + +void DtxManager::rollback(const std::string& xid) +{ + getWork(xid)->rollback(); +} + +DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) +{ + WorkMap::iterator i = work.find(xid); + if (i == work.end()) throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + return i; +} diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h new file mode 100644 index 0000000000..230a0631f3 --- /dev/null +++ b/cpp/src/qpid/broker/DtxManager.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DtxManager_ +#define _DtxManager_ + +#include <boost/ptr_container/ptr_map.hpp> +#include "DtxBuffer.h" +#include "DtxWorkRecord.h" +#include "TransactionalStore.h" +#include "qpid/framing/amqp_types.h" + +namespace qpid { +namespace broker { + +class DtxManager{ + typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap; + + WorkMap work; + TransactionalStore* const store; + + WorkMap::iterator getWork(const std::string& xid); + +public: + DtxManager(TransactionalStore* const store); + ~DtxManager(); + void start(std::string xid, DtxBuffer::shared_ptr work); + void prepare(const std::string& xid); + void commit(const std::string& xid); + void rollback(const std::string& xid); +}; + +} +} + +#endif diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp new file mode 100644 index 0000000000..5e31312a8e --- /dev/null +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DtxWorkRecord.h" +#include <boost/format.hpp> +#include <boost/mem_fn.hpp> +using boost::mem_fn; + +using namespace qpid::broker; + +DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store) {} + +DtxWorkRecord::~DtxWorkRecord() {} + +bool DtxWorkRecord::prepare() +{ + checkCompletion(); + txn = store->begin(xid); + if (prepare(txn.get())) { + store->prepare(*txn); + return true; + } else { + abort(); + return false; + } +} + +bool DtxWorkRecord::prepare(TransactionContext* _txn) +{ + bool succeeded(true); + for (Work::iterator i = work.begin(); succeeded && i != work.end(); i++) { + succeeded = (*i)->prepare(_txn); + } + return succeeded; +} + +void DtxWorkRecord::commit() +{ + checkCompletion(); + if (txn.get()) { + //already prepared + store->commit(*txn); + txn.reset(); + + for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); + } else { + //1pc commit optimisation, don't need a 2pc transaction context: + std::auto_ptr<TransactionContext> localtxn = store->begin(); + if (prepare(localtxn.get())) { + store->commit(*localtxn); + } else { + store->abort(*localtxn); + abort(); + } + } +} + +void DtxWorkRecord::rollback() +{ + checkCompletion(); + abort(); +} + +void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) +{ + work.push_back(ops); +} + +void DtxWorkRecord::checkCompletion() +{ + if (!completed) { + //iterate through all DtxBuffers and ensure they are all ended + for (Work::iterator i = work.begin(); i != work.end(); i++) { + if (!(*i)->isEnded()) { + throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid); + } + } + completed = true; + } +} + +void DtxWorkRecord::abort() +{ + if (txn.get()) { + store->abort(*txn); + txn.reset(); + } + for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback)); + +} diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h new file mode 100644 index 0000000000..8ad4596963 --- /dev/null +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DtxWorkRecord_ +#define _DtxWorkRecord_ + +#include <algorithm> +#include <functional> +#include <vector> +#include "DtxBuffer.h" +#include "TransactionalStore.h" +#include "qpid/framing/amqp_types.h" + +namespace qpid { +namespace broker { + +class DtxWorkRecord +{ + typedef std::vector<DtxBuffer::shared_ptr> Work; + + const std::string xid; + TransactionalStore* const store; + bool completed; + Work work; + std::auto_ptr<TPCTransactionContext> txn; + + void checkCompletion(); + void abort(); + bool prepare(TransactionContext* txn); +public: + DtxWorkRecord(const std::string& xid, TransactionalStore* const store); + ~DtxWorkRecord(); + bool prepare(); + void commit(); + void rollback(); + void add(DtxBuffer::shared_ptr ops); +}; + +} +} + +#endif diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp index c526c88c25..5b5b00e929 100644 --- a/cpp/src/qpid/broker/TxBuffer.cpp +++ b/cpp/src/qpid/broker/TxBuffer.cpp @@ -24,17 +24,13 @@ using boost::mem_fn; using namespace qpid::broker; -bool TxBuffer::prepare(TransactionalStore* const store) +bool TxBuffer::prepare(TransactionContext* const ctxt) { - std::auto_ptr<TransactionContext> ctxt; - if(store) ctxt = store->begin(); for(op_iterator i = ops.begin(); i < ops.end(); i++){ - if(!(*i)->prepare(ctxt.get())){ - if(store) store->abort(*ctxt); + if(!(*i)->prepare(ctxt)){ return false; } } - if(store) store->commit(*ctxt); return true; } @@ -54,3 +50,18 @@ void TxBuffer::enlist(TxOp::shared_ptr op) { ops.push_back(op); } + +bool TxBuffer::commitLocal(TransactionalStore* const store) +{ + std::auto_ptr<TransactionContext> ctxt; + if(store) ctxt = store->begin(); + if (prepare(ctxt.get())) { + if(store) store->commit(*ctxt); + commit(); + return true; + } else { + if(store) store->abort(*ctxt); + rollback(); + return false; + } +} diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h index 86ee8945b5..361c47e92c 100644 --- a/cpp/src/qpid/broker/TxBuffer.h +++ b/cpp/src/qpid/broker/TxBuffer.h @@ -61,44 +61,52 @@ namespace qpid { class TxBuffer{ typedef std::vector<TxOp::shared_ptr>::iterator op_iterator; std::vector<TxOp::shared_ptr> ops; + protected: + public: + typedef boost::shared_ptr<TxBuffer> shared_ptr; + /** + * Adds an operation to the transaction. + */ + void enlist(TxOp::shared_ptr op); + /** * Requests that all ops are prepared. This should * primarily involve making sure that a persistent record * of the operations is stored where necessary. - * - * All ops will be prepared under a transaction on the - * specified store. If any operation fails on prepare, - * this transaction will be rolled back. - * + * * Once prepared, a transaction can be committed (or in * the 2pc case, rolled back). * * @returns true if all the operations prepared * successfully, false if not. */ - bool prepare(TransactionalStore* const store); + bool prepare(TransactionContext* const ctxt); + /** - * Signals that the ops all prepared all completed - * successfully and can now commit, i.e. the operation can - * now be fully carried out. + * Signals that the ops all prepared successfully and can + * now commit, i.e. the operation can now be fully carried + * out. * * Should only be called after a call to prepare() returns * true. */ void commit(); + /** - * Rolls back all the operations. + * Signals that all ops can be rolled back. * * Should only be called either after a call to prepare() * returns true (2pc) or instead of a prepare call * ('server-local') */ void rollback(); + /** - * Adds an operation to the transaction. + * Helper method for managing the process of server local + * commit */ - void enlist(TxOp::shared_ptr op); + bool commitLocal(TransactionalStore* const store); }; } } diff --git a/cpp/src/tests/TxBufferTest.cpp b/cpp/src/tests/TxBufferTest.cpp index bd43691235..f58d6e356f 100644 --- a/cpp/src/tests/TxBufferTest.cpp +++ b/cpp/src/tests/TxBufferTest.cpp @@ -161,7 +161,9 @@ class TxBufferTest : public CppUnit::TestCase }; CPPUNIT_TEST_SUITE(TxBufferTest); - CPPUNIT_TEST(testPrepareAndCommit); + CPPUNIT_TEST(testCommitLocal); + CPPUNIT_TEST(testFailOnCommitLocal); + CPPUNIT_TEST(testPrepare); CPPUNIT_TEST(testFailOnPrepare); CPPUNIT_TEST(testRollback); CPPUNIT_TEST(testBufferIsClearedAfterRollback); @@ -170,14 +172,14 @@ class TxBufferTest : public CppUnit::TestCase public: - void testPrepareAndCommit(){ + void testCommitLocal(){ MockTransactionalStore store; store.expectBegin().expectCommit(); MockTxOp::shared_ptr opA(new MockTxOp()); opA->expectPrepare().expectCommit(); MockTxOp::shared_ptr opB(new MockTxOp()); - opB->expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order + opB->expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test relative order MockTxOp::shared_ptr opC(new MockTxOp()); opC->expectPrepare().expectCommit(); @@ -187,8 +189,7 @@ class TxBufferTest : public CppUnit::TestCase buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice buffer.enlist(static_pointer_cast<TxOp>(opC)); - CPPUNIT_ASSERT(buffer.prepare(&store)); - buffer.commit(); + CPPUNIT_ASSERT(buffer.commitLocal(&store)); store.check(); CPPUNIT_ASSERT(store.isCommitted()); opA->check(); @@ -196,11 +197,51 @@ class TxBufferTest : public CppUnit::TestCase opC->check(); } - void testFailOnPrepare(){ + void testFailOnCommitLocal(){ MockTransactionalStore store; store.expectBegin().expectAbort(); MockTxOp::shared_ptr opA(new MockTxOp()); + opA->expectPrepare().expectRollback(); + MockTxOp::shared_ptr opB(new MockTxOp(true)); + opB->expectPrepare().expectRollback(); + MockTxOp::shared_ptr opC(new MockTxOp());//will never get prepare as b will fail + opC->expectRollback(); + + TxBuffer buffer; + buffer.enlist(static_pointer_cast<TxOp>(opA)); + buffer.enlist(static_pointer_cast<TxOp>(opB)); + buffer.enlist(static_pointer_cast<TxOp>(opC)); + + CPPUNIT_ASSERT(!buffer.commitLocal(&store)); + CPPUNIT_ASSERT(store.isAborted()); + store.check(); + opA->check(); + opB->check(); + opC->check(); + } + + void testPrepare(){ + MockTxOp::shared_ptr opA(new MockTxOp()); + opA->expectPrepare(); + MockTxOp::shared_ptr opB(new MockTxOp()); + opB->expectPrepare(); + MockTxOp::shared_ptr opC(new MockTxOp()); + opC->expectPrepare(); + + TxBuffer buffer; + buffer.enlist(static_pointer_cast<TxOp>(opA)); + buffer.enlist(static_pointer_cast<TxOp>(opB)); + buffer.enlist(static_pointer_cast<TxOp>(opC)); + + CPPUNIT_ASSERT(buffer.prepare(0)); + opA->check(); + opB->check(); + opC->check(); + } + + void testFailOnPrepare(){ + MockTxOp::shared_ptr opA(new MockTxOp()); opA->expectPrepare(); MockTxOp::shared_ptr opB(new MockTxOp(true)); opB->expectPrepare(); @@ -211,9 +252,7 @@ class TxBufferTest : public CppUnit::TestCase buffer.enlist(static_pointer_cast<TxOp>(opB)); buffer.enlist(static_pointer_cast<TxOp>(opC)); - CPPUNIT_ASSERT(!buffer.prepare(&store)); - store.check(); - CPPUNIT_ASSERT(store.isAborted()); + CPPUNIT_ASSERT(!buffer.prepare(0)); opA->check(); opB->check(); opC->check(); |