diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rwxr-xr-x | cpp/src/generate.sh | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxAck.cpp | 59 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxAck.h | 47 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/apr/LFSessionContext.cpp | 2 |
10 files changed, 145 insertions, 25 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index acc3507d24..abc081a6a5 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -181,6 +181,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/DeliverableMessage.cpp \ qpid/broker/DeliveryRecord.cpp \ qpid/broker/DirectExchange.cpp \ + qpid/broker/DtxAck.cpp \ qpid/broker/DtxBuffer.cpp \ qpid/broker/DtxHandlerImpl.cpp \ qpid/broker/DtxManager.cpp \ @@ -234,6 +235,7 @@ nobase_include_HEADERS = \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.h \ qpid/broker/DirectExchange.h \ + qpid/broker/DtxAck.h \ qpid/broker/DtxBuffer.h \ qpid/broker/DtxHandlerImpl.h \ qpid/broker/DtxManager.h \ diff --git a/cpp/src/generate.sh b/cpp/src/generate.sh index 40bf3373d6..a600897cc3 100755 --- a/cpp/src/generate.sh +++ b/cpp/src/generate.sh @@ -7,7 +7,7 @@ set -e gentools_dir="$srcdir/../gentools" specs_dir="$srcdir/../../specs" -specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml $specs_dir/amqp-dtx-preview.xml" +specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml $specs_dir/amqp-dtx-preview.0-9.xml" test -z "$JAVA" && JAVA=java ; test -z "$JAVAC" && JAVAC=javac ; diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 235f320cb7..096478faad 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -28,17 +28,19 @@ #include <boost/bind.hpp> #include <boost/format.hpp> -#include "BrokerChannel.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/QpidError.h" -#include "DeliverableMessage.h" -#include "BrokerQueue.h" + +#include "BrokerAdapter.h" +#include "BrokerChannel.h" #include "BrokerMessage.h" +#include "BrokerQueue.h" +#include "Connection.h" +#include "DeliverableMessage.h" +#include "DtxAck.h" #include "MessageStore.h" #include "TxAck.h" #include "TxPublish.h" -#include "BrokerAdapter.h" -#include "Connection.h" using std::mem_fun_ref; using std::bind2nd; @@ -133,7 +135,8 @@ void Channel::endDtx(const std::string& xid){ % dtxBuffer->getXid() % xid); } - TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); dtxBuffer->enlist(txAck); dtxBuffer->markEnded(); diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 15c207ce44..9f73f940ff 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -38,8 +38,8 @@ namespace qpid { class DeliveryRecord{ mutable Message::shared_ptr msg; mutable Queue::shared_ptr queue; - std::string consumerTag; - uint64_t deliveryTag; + const std::string consumerTag; + const uint64_t deliveryTag; bool pull; public: diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp new file mode 100644 index 0000000000..a879abd9ab --- /dev/null +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DtxAck.h" +#include "qpid/log/Statement.h" + +using std::bind1st; +using std::bind2nd; +using std::mem_fun_ref; +using namespace qpid::broker; + +DtxAck::DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked) +{ + remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), + not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); + unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); +} + +bool DtxAck::prepare(TransactionContext* ctxt) throw() +{ + try{ + //record dequeue in the store + for (ack_iterator i = pending.begin(); i != pending.end(); i++) { + i->discard(ctxt); + } + return true; + }catch(...){ + QPID_LOG(error, "Failed to prepare"); + return false; + } +} + +void DtxAck::commit() throw() +{ + pending.clear(); +} + +void DtxAck::rollback() throw() +{ + for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue)); + pending.clear(); +} diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h new file mode 100644 index 0000000000..9da9d2078a --- /dev/null +++ b/cpp/src/qpid/broker/DtxAck.h @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DtxAck_ +#define _DtxAck_ + +#include <algorithm> +#include <functional> +#include <list> +#include "AccumulatedAck.h" +#include "DeliveryRecord.h" +#include "TxOp.h" + +namespace qpid { + namespace broker { + class DtxAck : public TxOp{ + std::list<DeliveryRecord> pending; + + public: + DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~DtxAck(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 1c3fce9cdb..933d787a8a 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -22,23 +22,29 @@ #include "BrokerChannel.h" using namespace qpid::broker; +using qpid::framing::AMQP_ClientProxy; using qpid::framing::FieldTable; using qpid::framing::MethodContext; using std::string; -DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {} +DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : + CoreRefs(parent), + dClient(AMQP_ClientProxy::DtxDemarcation::get(proxy)), + cClient(AMQP_ClientProxy::DtxCoordination::get(proxy)) + +{ +} // DtxDemarcationHandler: -void DtxHandlerImpl::select(const MethodContext& /*context*/ ) +void DtxHandlerImpl::select(const MethodContext& context ) { - //don't need to do anything here really - //send select-ok + dClient.selectOk(context.getRequestId()); } -void DtxHandlerImpl::end(const MethodContext& /*context*/, +void DtxHandlerImpl::end(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, bool fail, @@ -54,10 +60,10 @@ void DtxHandlerImpl::end(const MethodContext& /*context*/, } else { channel.endDtx(xid); } - //send end-ok + dClient.endOk(0/*TODO - set flags*/, context.getRequestId()); } -void DtxHandlerImpl::start(const MethodContext& /*context*/, +void DtxHandlerImpl::start(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, bool /*join*/, @@ -69,36 +75,36 @@ void DtxHandlerImpl::start(const MethodContext& /*context*/, } else { channel.startDtx(xid, broker.getDtxManager()); } - //send start-ok + dClient.startOk(0/*TODO - set flags*/, context.getRequestId()); } // DtxCoordinationHandler: -void DtxHandlerImpl::prepare(const MethodContext& /*context*/, +void DtxHandlerImpl::prepare(const MethodContext& context, u_int16_t /*ticket*/, const string& xid ) { broker.getDtxManager().prepare(xid); - //send prepare-ok + cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId()); } -void DtxHandlerImpl::commit(const MethodContext& /*context*/, +void DtxHandlerImpl::commit(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, bool /*onePhase*/ ) { - broker.getDtxManager().commit(xid); - //send commit-ok //TODO use onePhase flag to validate correct sequence + broker.getDtxManager().commit(xid); + cClient.commitOk(0/*TODO - set flags*/, context.getRequestId()); } -void DtxHandlerImpl::rollback(const MethodContext& /*context*/, +void DtxHandlerImpl::rollback(const MethodContext& context, u_int16_t /*ticket*/, const string& xid ) { broker.getDtxManager().rollback(xid); - //send rollback-ok + cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId()); } void DtxHandlerImpl::recover(const MethodContext& /*context*/, diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index 20fd825249..eda9e83a91 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -31,6 +31,8 @@ class DtxHandlerImpl public framing::AMQP_ServerOperations::DtxCoordinationHandler, public framing::AMQP_ServerOperations::DtxDemarcationHandler { + framing::AMQP_ClientProxy::DtxDemarcation dClient; + framing::AMQP_ClientProxy::DtxCoordination cClient; public: DtxHandlerImpl(CoreRefs& parent); diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index d49cfbadfe..0b0262902b 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -20,6 +20,7 @@ */ #include "DtxManager.h" #include <boost/format.hpp> +#include <iostream> using namespace qpid::broker; diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.cpp b/cpp/src/qpid/sys/apr/LFSessionContext.cpp index 2672b168e5..0717dcc9ae 100644 --- a/cpp/src/qpid/sys/apr/LFSessionContext.cpp +++ b/cpp/src/qpid/sys/apr/LFSessionContext.cpp @@ -98,7 +98,7 @@ void LFSessionContext::write(){ while(frame && out.available() >= frame->size()){ encoded = true; frame->encode(out); - QPID_LOG(debug, "SENT: " << frame); + QPID_LOG(debug, "SENT: " << *frame); delete frame; framesToWrite.pop(); frame = framesToWrite.empty() ? 0 : framesToWrite.front(); |