diff options
author | Alan Conway <aconway@apache.org> | 2008-11-05 15:22:47 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-05 15:22:47 +0000 |
commit | ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b (patch) | |
tree | 9ee2e8cdcad566d355233da8b4a45b92c9f6ed3f /cpp/src | |
parent | d3f652de187cac449e1fae4e00fce59c204f020a (diff) | |
download | qpid-python-ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b.tar.gz |
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array.
framing/Array:
- added some std:::vector like functions & typedefs.
- use TypeCode enums, human readable ostream << operator.
rubygen - fixed error in generation of exceptions for bad codes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxAck.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveredDequeue.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveredEnqueue.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxOp.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxOpVisitor.h | 100 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 65 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 77 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceSet.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/TxMocks.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 49 |
21 files changed, 298 insertions, 75 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 2b0becf836..8f77d4c3c6 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -506,6 +506,7 @@ nobase_include_HEADERS = \ qpid/broker/TxAccept.h \ qpid/broker/TxBuffer.h \ qpid/broker/TxOp.h \ + qpid/broker/TxOpVisitor.h \ qpid/broker/TxPublish.h \ qpid/broker/Vhost.h \ qpid/client/AckMode.h \ diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h index 05c4499839..d43532906a 100644 --- a/cpp/src/qpid/broker/DtxAck.h +++ b/cpp/src/qpid/broker/DtxAck.h @@ -39,6 +39,7 @@ namespace qpid { virtual void commit() throw(); virtual void rollback() throw(); virtual ~DtxAck(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } }; } } diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h index 276e1f4c5c..ef6b8f1f74 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -45,6 +45,10 @@ namespace qpid { virtual void commit() throw(); virtual void rollback() throw(); virtual ~RecoveredDequeue(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + + Queue::shared_ptr getQueue() const { return queue; } + boost::intrusive_ptr<Message> getMessage() const { return msg; } }; } } diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h index 6525179769..2d97768e65 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -45,6 +45,11 @@ namespace qpid { virtual void commit() throw(); virtual void rollback() throw(); virtual ~RecoveredEnqueue(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + + Queue::shared_ptr getQueue() const { return queue; } + boost::intrusive_ptr<Message> getMessage() const { return msg; } + }; } } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 22f6316974..73dfc8cde8 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -436,7 +436,7 @@ void SemanticState::recover(bool requeue) if(requeue){ //take copy and clear unacked as requeue may result in redelivery to this session //which will in turn result in additions to unacked - std::list<DeliveryRecord> copy = unacked; + DeliveryRecords copy = unacked; unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index dbb3e1d3b6..340017ddf0 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -134,7 +134,7 @@ class SemanticState : public sys::OutputTask, DeliveryAdapter& deliveryAdapter; ConsumerImplMap consumers; NameGenerator tagGenerator; - std::list<DeliveryRecord> unacked; + DeliveryRecords unacked; TxBuffer::shared_ptr txBuffer; DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; @@ -216,8 +216,11 @@ class SemanticState : public sys::OutputTask, static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); } template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); } - template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(), f); } - + DeliveryRecords& getUnacked() { return unacked; } + framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } + TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } + void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; } + void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); }; diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index 6d307bf735..594a466453 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -69,7 +69,7 @@ void TxAccept::RangeOps::commit() } } -TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : +TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) : acked(_acked), unacked(_unacked), ops(unacked) { //populate the ops diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h index 5474327f7c..0a5fdedb0a 100644 --- a/cpp/src/qpid/broker/TxAccept.h +++ b/cpp/src/qpid/broker/TxAccept.h @@ -56,8 +56,8 @@ namespace qpid { void commit(); }; - framing::SequenceSet& acked; - std::list<DeliveryRecord>& unacked; + framing::SequenceSet acked; + DeliveryRecords& unacked; RangeOps ops; public: @@ -66,11 +66,15 @@ namespace qpid { * acks received * @param unacked the record of delivered messages */ - TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked); + TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); virtual ~TxAccept(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + + // Used by cluster replication. + const framing::SequenceSet& getAcked() const { return acked; } }; } } diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp index 8fe2c17bf0..ae18e0f318 100644 --- a/cpp/src/qpid/broker/TxBuffer.cpp +++ b/cpp/src/qpid/broker/TxBuffer.cpp @@ -22,6 +22,7 @@ #include "qpid/log/Statement.h" #include <boost/mem_fn.hpp> +#include <boost/bind.hpp> using boost::mem_fn; using namespace qpid::broker; @@ -73,3 +74,7 @@ bool TxBuffer::commitLocal(TransactionalStore* const store) } return false; } + +void TxBuffer::accept(TxOpConstVisitor& v) const { + std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); +} diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h index 361c47e92c..aabb5ea0b1 100644 --- a/cpp/src/qpid/broker/TxBuffer.h +++ b/cpp/src/qpid/broker/TxBuffer.h @@ -107,6 +107,9 @@ namespace qpid { * commit */ bool commitLocal(TransactionalStore* const store); + + // Used by cluster to replicate transaction status. + void accept(TxOpConstVisitor& v) const; }; } } diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h index e687c437cc..5265478e36 100644 --- a/cpp/src/qpid/broker/TxOp.h +++ b/cpp/src/qpid/broker/TxOp.h @@ -21,11 +21,15 @@ #ifndef _TxOp_ #define _TxOp_ +#include "TxOpVisitor.h" #include "TransactionalStore.h" #include <boost/shared_ptr.hpp> namespace qpid { namespace broker { + +class TxOpConstVisitor; + class TxOp{ public: typedef boost::shared_ptr<TxOp> shared_ptr; @@ -34,9 +38,11 @@ namespace qpid { virtual void commit() throw() = 0; virtual void rollback() throw() = 0; virtual ~TxOp(){} + + virtual void accept(TxOpConstVisitor&) const = 0; }; - } -} + +}} // namespace qpid::broker #endif diff --git a/cpp/src/qpid/broker/TxOpVisitor.h b/cpp/src/qpid/broker/TxOpVisitor.h new file mode 100644 index 0000000000..a5f2a018c9 --- /dev/null +++ b/cpp/src/qpid/broker/TxOpVisitor.h @@ -0,0 +1,100 @@ +#ifndef QPID_BROKER_TXOPVISITOR_H +#define QPID_BROKER_TXOPVISITOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/shared_ptr.h" + +namespace qpid { +namespace broker { + +class DtxAck; +class RecoveredDequeue; +class RecoveredEnqueue; +class TxAccept; +class TxPublish; + +/** + * Visitor for TxOp familly of classes. + */ +struct TxOpConstVisitor +{ + virtual ~TxOpConstVisitor() {} + virtual void operator()(const DtxAck&) = 0; + virtual void operator()(const RecoveredDequeue&) = 0; + virtual void operator()(const RecoveredEnqueue&) = 0; + virtual void operator()(const TxAccept&) = 0; + virtual void operator()(const TxPublish&) = 0; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_TXOPVISITOR_H*/ +#ifndef QPID_BROKER_TXOPVISITOR_H +#define QPID_BROKER_TXOPVISITOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/shared_ptr.h" + +namespace qpid { +namespace broker { + +class DtxAck; +class RecoveredDequeue; +class RecoveredEnqueue; +class TxAccept; +class TxPublish; + +/** + * Visitor for TxOp familly of classes. + */ +struct TxOpConstVisitor +{ + virtual ~TxOpConstVisitor() {} + virtual void operator()(const DtxAck&) = 0; + virtual void operator()(const RecoveredDequeue&) = 0; + virtual void operator()(const RecoveredEnqueue&) = 0; + virtual void operator()(const TxAccept&) = 0; + virtual void operator()(const TxPublish&) = 0; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_TXOPVISITOR_H*/ diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index 018437f1ed..1f73cb8767 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -75,8 +75,12 @@ namespace qpid { virtual void deliverTo(const boost::shared_ptr<Queue>& queue); virtual ~TxPublish(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } uint64_t contentSize(); + + boost::intrusive_ptr<Message> getMessage() const { return msg; } + const std::list<Queue::shared_ptr> getQueues() const { return queues; } }; } } diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 9526a33ac6..fad0563872 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -90,7 +90,4 @@ struct ClusterPlugin : public Plugin { static ClusterPlugin instance; // Static initialization. -// For test purposes. -Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; } - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index ada26ab2fb..513816735d 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -24,7 +24,11 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" +#include "qpid/broker/TxBuffer.h" #include "qpid/broker/TxPublish.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/RecoveredDequeue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AllInvoker.h" @@ -36,7 +40,7 @@ #include <boost/current_function.hpp> -// FIXME aconway 2008-11-03: +// TODO aconway 2008-11-03: // // Disproportionate amount of code here is dedicated to receiving a // brain-dump when joining a cluster and building initial @@ -113,7 +117,6 @@ bool Connection::checkUnsupported(const AMQBody& body) { std::string message; if (body.getMethod()) { switch (body.getMethod()->amqpClassId()) { - case TX_CLASS_ID: message = "TX transactions are not currently supported by cluster."; break; case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; } } @@ -122,13 +125,13 @@ bool Connection::checkUnsupported(const AMQBody& body) { if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster."; } if (!message.empty()) - connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0); + connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0); return !message.empty(); } // Delivered from cluster. void Connection::delivered(framing::AMQFrame& f) { - QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f); + QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f); assert(!catchUp); currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol. @@ -247,11 +250,15 @@ bool Connection::isDumped() const { return self.first == cluster.getId() && self.second == 0; } + +shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { + shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname); + if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname)); + return queue; +} + broker::QueuedMessage Connection::getDumpMessage() { - // Get a message from the DUMP queue. - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue")); - broker::QueuedMessage m = dumpQueue->get(); + broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get(); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); return m; } @@ -267,14 +274,11 @@ void Connection::deliveryRecord(const string& qname, bool ended, bool windowing) { - broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname); - if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname)); broker::QueuedMessage m; + broker::Queue::shared_ptr queue = findQueue(qname); if (!ended) { // Has a message - if (acquired) { // Message at front of dump queue - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - m = dumpQueue->get(); - } + if (acquired) // Message is on the dump queue + m = getDumpMessage(); else // Message at original position in original queue m = queue->find(position); if (!m.payload) @@ -286,8 +290,7 @@ void Connection::deliveryRecord(const string& qname, if (cancelled) dr.cancel(dr.getTag()); if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - - semanticState().record(dr); + semanticState().record(dr); // Part of the session's unacked list. } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -304,6 +307,36 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } +void Connection::txStart() { + txBuffer = make_shared_ptr(new broker::TxBuffer()); +} +void Connection::txAccept(const framing::SequenceSet& acked) { + txBuffer->enlist(make_shared_ptr(new broker::TxAccept(acked, semanticState().getUnacked()))); +} + +void Connection::txDequeue(const std::string& queue) { + txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload))); +} + +void Connection::txEnqueue(const std::string& queue) { + txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload))); +} + +void Connection::txPublish(const framing::Array& queues, bool delivered) { + boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload)); + for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) + txPub->deliverTo(findQueue((*i)->get<std::string>())); + txPub->delivered = delivered; + txBuffer->enlist(txPub); +} + +void Connection::txEnd() { + semanticState().setTxBuffer(txBuffer); +} + +void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { + semanticState().setAccumulatedAck(s); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 331ac33ab0..2eafa90f32 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -125,12 +125,21 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); + void txStart(); + void txAccept(const framing::SequenceSet&); + void txDequeue(const std::string&); + void txEnqueue(const std::string&); + void txPublish(const qpid::framing::Array&, bool); + void txEnd(); + void accumulatedAck(const qpid::framing::SequenceSet&); + private: bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); void deliverDoOutput(uint32_t requested); void sendDoOutput(); + boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); broker::SessionState& sessionState(); broker::SemanticState& semanticState(); broker::QueuedMessage getDumpMessage(); @@ -148,6 +157,7 @@ class Connection : framing::SequenceNumber mcastSeq; framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; + boost::shared_ptr<broker::TxBuffer> txBuffer; friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index a2860f6f32..bb3cfdfa56 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -32,6 +32,12 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/SessionState.h" +#include "qpid/broker/TxOpVisitor.h" +#include "qpid/broker/DtxAck.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/broker/TxPublish.h" +#include "qpid/broker/RecoveredDequeue.h" +#include "qpid/broker/RecoveredEnqueue.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -43,7 +49,7 @@ #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> - +#include <algorithm> namespace qpid { namespace cluster { @@ -198,7 +204,7 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn shadowConnection = catchUpConnection(); broker::Connection& bc = dumpConnection->getBrokerConnection(); - // FIXME aconway 2008-10-20: What authentication info to reconnect? + // FIXME aconway 2008-10-20: What authentication info to use on reconnect? shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( @@ -227,7 +233,10 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); QPID_LOG(debug, dumperId << " dumping unacknowledged messages."); - ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); + broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); + std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1)); + + dumpTxState(ss->getSemanticState()); // Tx transaction state. // Adjust for command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); @@ -283,22 +292,12 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { } void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { - dumpDeliveryRecordMessage(dr); - dumpDeliveryRecord(dr); -} - -void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) { - // Dump the message associated with a dr if need be. if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the // dumpees queue, put it on the dump queue for dumpee to pick up. // MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage()); } -} - -void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) { - // Assumes the associated message has already been dumped (if needed) ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, @@ -312,4 +311,56 @@ void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) { dr.isWindowing()); } +class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper { + public: + TxOpDumper(DumpClient& dc, client::AsyncSession s) + : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {} + + void operator()(const broker::DtxAck& ) { + throw InternalErrorException("DTX transactions not currently supported by cluster."); + } + + void operator()(const broker::RecoveredDequeue& rdeq) { + dumpMessage(rdeq.getMessage()); + proxy.txEnqueue(rdeq.getQueue()->getName()); + } + + void operator()(const broker::RecoveredEnqueue& renq) { + dumpMessage(renq.getMessage()); + proxy.txEnqueue(renq.getQueue()->getName()); + } + + void operator()(const broker::TxAccept& txAccept) { + proxy.txAccept(txAccept.getAcked()); + } + + void operator()(const broker::TxPublish& txPub) { + dumpMessage(txPub.getMessage()); + typedef std::list<Queue::shared_ptr> QueueList; + const QueueList& qlist = txPub.getQueues(); + Array qarray(TYPE_CODE_STR8); + for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) + qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + proxy.txPublish(qarray, txPub.delivered); + } + + private: + DumpClient& parent; + client::AsyncSession session; + ClusterConnectionProxy proxy; +}; + +void DumpClient::dumpTxState(broker::SemanticState& s) { + QPID_LOG(debug, dumperId << " dumping TX transaction state."); + ClusterConnectionProxy proxy(shadowSession); + proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); + if (txBuffer) { + proxy.txStart(); + TxOpDumper dumper(*this, shadowSession); + txBuffer->accept(dumper); + proxy.txEnd(); + } +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h index 716e7dcc3a..23676e7646 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -71,6 +71,8 @@ class DumpClient : public sys::Runnable { void dump(); void run(); // Will delete this when finished. + void dumpUnacked(const broker::DeliveryRecord&); + private: void dumpQueue(const boost::shared_ptr<broker::Queue>&); void dumpExchange(const boost::shared_ptr<broker::Exchange>&); @@ -79,10 +81,8 @@ class DumpClient : public sys::Runnable { void dumpBinding(const std::string& queue, const broker::QueueBinding& binding); void dumpConnection(const boost::intrusive_ptr<Connection>& connection); void dumpSession(broker::SessionHandler& s); + void dumpTxState(broker::SemanticState& s); void dumpConsumer(const broker::SemanticState::ConsumerImpl*); - void dumpUnacked(const broker::DeliveryRecord&); - void dumpDeliveryRecord(const broker::DeliveryRecord&); - void dumpDeliveryRecordMessage(const broker::DeliveryRecord&); MemberId dumperId; MemberId dumpeeId; diff --git a/cpp/src/qpid/framing/SequenceSet.h b/cpp/src/qpid/framing/SequenceSet.h index 8d97d03c4b..57b9c2c8e1 100644 --- a/cpp/src/qpid/framing/SequenceSet.h +++ b/cpp/src/qpid/framing/SequenceSet.h @@ -31,9 +31,9 @@ class Buffer; class SequenceSet : public RangeSet<SequenceNumber> { public: SequenceSet() {} - explicit SequenceSet(const RangeSet<SequenceNumber>& r) + SequenceSet(const RangeSet<SequenceNumber>& r) : RangeSet<SequenceNumber>(r) {} - explicit SequenceSet(const SequenceNumber& s) { add(s); } + SequenceSet(const SequenceNumber& s) { add(s); } SequenceSet(const SequenceNumber& start, const SequenceNumber finish) { add(start,finish); } diff --git a/cpp/src/tests/TxMocks.h b/cpp/src/tests/TxMocks.h index 86864b987e..fe103c5fe5 100644 --- a/cpp/src/tests/TxMocks.h +++ b/cpp/src/tests/TxMocks.h @@ -114,6 +114,9 @@ public: void check(){ assertEqualVector(expected, actual); } + + void accept(TxOpConstVisitor&) const {} + ~MockTxOp(){} }; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 623c033e4c..cf6c2e73de 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -45,13 +45,6 @@ #include <algorithm> #include <iterator> -namespace qpid { -namespace cluster { -// FIXME aconway 2008-11-04: remove. -Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp -}} // namespace qpid::cluster - - namespace std { // ostream operators in std:: namespace template <class T> ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); } @@ -69,7 +62,6 @@ using qpid::sys::TIME_SEC; using qpid::broker::Broker; using boost::shared_ptr; using qpid::cluster::Cluster; -using qpid::cluster::getGlobalCluster; /** Parse broker & cluster options */ Broker::Options parseOpts(size_t argc, const char* argv[]) { @@ -216,8 +208,19 @@ class Sender { uint16_t channel; }; -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) { - ClusterFixture cluster(1, 1); // FIXME aconway 2008-11-04: local broker at index 1 +QPID_AUTO_TEST_CASE(testUnsupported) { + ScopedSuppressLogging sl; + ClusterFixture cluster(1); + Client c1(cluster[0], "c1"); + BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException); + Client c2(cluster[0], "c2"); + Message m; + m.getDeliveryProperties().setTtl(1); + BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception); +} + +QPID_AUTO_TEST_CASE(testTxTransaction) { + ClusterFixture cluster(1); Client c0(cluster[0], "c0"); c0.session.queueDeclare(arg::queue="q"); c0.session.messageTransfer(arg::content=Message("A", "q")); @@ -236,7 +239,8 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) { SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); rollbackSession.messageTransfer(arg::content=Message("1", "q")); - BOOST_CHECK_EQUAL(rollbackSubs.get("q", TIME_SEC).getData(), "B"); + Message rollbackMessage = rollbackSubs.get("q", TIME_SEC); + BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); // Add new member mid transaction. @@ -250,10 +254,14 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) { rollbackSession.messageTransfer(arg::content=Message("3", "q")); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + // Commit/roll back. commitSession.txCommit(); rollbackSession.txRollback(); - // Verify queue status: just the comitted messages + rollbackSession.messageRelease(rollbackMessage.getId()); + + + // Verify queue status: just the comitted messages and dequeues should remain. BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B"); BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a"); @@ -261,20 +269,6 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) { BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c"); } -QPID_AUTO_TEST_CASE(testUnsupported) { - ScopedSuppressLogging sl; - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - BOOST_CHECK_THROW(c0.session.txSelect(), Exception); - BOOST_CHECK(!c0.connection.isOpen()); - Client c1(cluster[0], "c1"); - BOOST_CHECK_THROW(c1.session.dtxCommit(), Exception); - Client c2(cluster[0], "c2"); - Message m; - m.getDeliveryProperties().setTtl(1); - BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception); -} - QPID_AUTO_TEST_CASE(testUnacked) { // Verify replication of unacknowledged messages. ClusterFixture cluster(1); @@ -388,8 +382,7 @@ QPID_AUTO_TEST_CASE(testDumpMessageBuilder) { Client c1(cluster[1], "c1"); BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "abcd"); - - BOOST_CHECK_EQUAL(2u, getGlobalCluster().getUrls().size()); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size()); } QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { |