diff options
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxAck.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxAck.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 63 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TxAccept.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TxAccept.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TxAck.cpp | 59 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TxAck.h | 57 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionSettings.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionSettings.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ConnectionOptions.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TxAckTest.cpp | 95 | ||||
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 29 |
18 files changed, 42 insertions, 307 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 9891f225cb..d498b1f60e 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -294,7 +294,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAccept.cpp \ - qpid/broker/TxAck.cpp \ qpid/broker/TxBuffer.cpp \ qpid/broker/TxPublish.cpp \ qpid/broker/Vhost.cpp \ @@ -429,7 +428,6 @@ nobase_include_HEADERS = \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ qpid/broker/TxAccept.h \ - qpid/broker/TxAck.h \ qpid/broker/TxBuffer.h \ qpid/broker/TxOp.h \ qpid/broker/TxPublish.h \ diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 4406eccc44..530dca99a4 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -83,8 +83,8 @@ bool DeliveryRecord::after(DeliveryId tag) const{ return id > tag; } -bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{ - return range->covers(id); +bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{ + return range->contains(id); } void DeliveryRecord::redeliver(SemanticState* const session) { @@ -118,6 +118,8 @@ void DeliveryRecord::release(bool setRedelivered) queue->requeue(msg); acquired = false; setEnded(); + } else { + QPID_LOG(debug, "Ignoring release for " << id << " acquired=" << acquired << ", ended =" << ended); } } @@ -130,6 +132,7 @@ void DeliveryRecord::accept(TransactionContext* ctxt) { if (acquired && !ended) { queue->dequeue(ctxt, msg.payload); setEnded(); + QPID_LOG(debug, "Accepted " << id); } } diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index 7d08a4b1f0..78dc99e3c6 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -25,7 +25,7 @@ #include <list> #include <vector> #include <ostream> -#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceSet.h" #include "Queue.h" #include "Consumer.h" #include "DeliveryId.h" @@ -63,7 +63,7 @@ class DeliveryRecord{ bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; - bool coveredBy(const framing::AccumulatedAck* const range) const; + bool coveredBy(const framing::SequenceSet* const range) const; void dequeue(TransactionContext* ctxt = 0) const; void requeue() const; diff --git a/qpid/cpp/src/qpid/broker/DtxAck.cpp b/qpid/cpp/src/qpid/broker/DtxAck.cpp index 0e6f94d4e0..47637369ca 100644 --- a/qpid/cpp/src/qpid/broker/DtxAck.cpp +++ b/qpid/cpp/src/qpid/broker/DtxAck.cpp @@ -26,7 +26,7 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked) +DtxAck::DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked) { remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); diff --git a/qpid/cpp/src/qpid/broker/DtxAck.h b/qpid/cpp/src/qpid/broker/DtxAck.h index c61b279c42..05c4499839 100644 --- a/qpid/cpp/src/qpid/broker/DtxAck.h +++ b/qpid/cpp/src/qpid/broker/DtxAck.h @@ -24,7 +24,7 @@ #include <algorithm> #include <functional> #include <list> -#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceSet.h" #include "DeliveryRecord.h" #include "TxOp.h" @@ -34,7 +34,7 @@ namespace qpid { std::list<DeliveryRecord> pending; public: - DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index ad617c1bc1..bdd8edac87 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -28,7 +28,6 @@ #include "Queue.h" #include "SessionContext.h" #include "TxAccept.h" -#include "TxAck.h" #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" @@ -63,7 +62,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) prefetchCount(0), tagGenerator("sgen"), dtxSelected(false), - accumulatedAck(0), flowActive(true), outputTasks(ss) { @@ -116,14 +114,12 @@ void SemanticState::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void SemanticState::commit(MessageStore* const store, bool completeOnCommit) +void SemanticState::commit(MessageStore* const store) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - TxOp::shared_ptr txAck(completeOnCommit ? - static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) : - static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); + TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { accumulatedAck.clear(); @@ -377,59 +373,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { } -void SemanticState::ackCumulative(DeliveryId id) -{ - ack(id, id, true); -} - -void SemanticState::ackRange(DeliveryId first, DeliveryId last) -{ - ack(first, last, false); -} - -void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) -{ - { - ack_iterator start = cumulative ? unacked.begin() : - find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first)); - ack_iterator end = start; - - if (cumulative || first != last) { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last)); - } else if (start != unacked.end()) { - //just acked single element (move end past it) - ++end; - } - - for_each(start, end, boost::bind(&SemanticState::complete, this, _1)); - - if (txBuffer.get()) { - //in transactional mode, don't dequeue or remove, just - //maintain set of acknowledged messages: - accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); - - if (dtxBuffer.get()) { - //if enlisted in a dtx, copy the relevant slice from - //unacked and record it against that transaction: - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - //then remove that slice from the unacked record: - unacked.remove_if(boost::bind(&DeliveryRecord::coveredBy, _1, &accumulatedAck)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); - } - } else { - for_each(start, end, boost::bind(&DeliveryRecord::dequeue, _1, (TransactionContext*) 0)); - unacked.erase(start, end); - } - }//end of lock scope for delivery lock (TODO this is ugly, make it prettier) - - //if the prefetch limit had previously been reached, or credit - //had expired in windowing mode there may be messages that can - //be now be delivered - requestDispatch(); -} - void SemanticState::requestDispatch() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { @@ -667,7 +610,7 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: - accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet + accumulatedAck.add(first, last); if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 84dc0fc5bb..bf3a7756b5 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -34,7 +34,7 @@ #include "TxBuffer.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceSet.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/AggregateOutput.h" #include "qpid/shared_ptr.h" @@ -115,7 +115,7 @@ class SemanticState : public framing::FrameHandler::Chains, DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; DtxBufferMap suspendedXids; - framing::AccumulatedAck accumulatedAck; + framing::SequenceSet accumulatedAck; bool flowActive; boost::shared_ptr<Exchange> cacheExchange; sys::AggregateOutput outputTasks; @@ -125,7 +125,6 @@ class SemanticState : public framing::FrameHandler::Chains, bool checkPrefetch(boost::intrusive_ptr<Message>& msg); void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); - void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); void complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); @@ -168,7 +167,7 @@ class SemanticState : public framing::FrameHandler::Chains, bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void startTx(); - void commit(MessageStore* const store, bool completeOnCommit); + void commit(MessageStore* const store); void rollback(); void selectDtx(); void startDtx(const std::string& xid, DtxManager& mgr, bool join); @@ -184,10 +183,6 @@ class SemanticState : public framing::FrameHandler::Chains, void handle(boost::intrusive_ptr<Message> msg); bool doOutput() { return outputTasks.doOutput(); } - //preview only (completed == ack): - void ackCumulative(DeliveryId deliveryTag); - void ackRange(DeliveryId deliveryTag, DeliveryId endTag); - //final 0-10 spec (completed and accepted are distinct): void completed(DeliveryId deliveryTag, DeliveryId endTag); void accepted(DeliveryId deliveryTag, DeliveryId endTag); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 3d795014b8..e1589aea99 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -464,7 +464,7 @@ void SessionAdapter::TxHandlerImpl::select() void SessionAdapter::TxHandlerImpl::commit() { - state.commit(&getBroker().getStore(), false); + state.commit(&getBroker().getStore()); } void SessionAdapter::TxHandlerImpl::rollback() diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp index 634d066ecc..82acf61cd1 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.cpp +++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp @@ -25,9 +25,9 @@ using std::bind1st; using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -using qpid::framing::AccumulatedAck; +using qpid::framing::SequenceSet; -TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : +TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked) {} bool TxAccept::prepare(TransactionContext* ctxt) throw() diff --git a/qpid/cpp/src/qpid/broker/TxAccept.h b/qpid/cpp/src/qpid/broker/TxAccept.h index 011acf5d9e..9548c50c2a 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.h +++ b/qpid/cpp/src/qpid/broker/TxAccept.h @@ -24,7 +24,7 @@ #include <algorithm> #include <functional> #include <list> -#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceSet.h" #include "DeliveryRecord.h" #include "TxOp.h" @@ -35,7 +35,7 @@ namespace qpid { * a transactional channel. */ class TxAccept : public TxOp{ - framing::AccumulatedAck& acked; + framing::SequenceSet& acked; std::list<DeliveryRecord>& unacked; public: @@ -44,7 +44,7 @@ namespace qpid { * acks received * @param unacked the record of delivered messages */ - TxAccept(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/qpid/cpp/src/qpid/broker/TxAck.cpp b/qpid/cpp/src/qpid/broker/TxAck.cpp deleted file mode 100644 index 40b9b0ff33..0000000000 --- a/qpid/cpp/src/qpid/broker/TxAck.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "TxAck.h" -#include "qpid/log/Statement.h" - -using std::bind1st; -using std::bind2nd; -using std::mem_fun_ref; -using namespace qpid::broker; -using qpid::framing::AccumulatedAck; - -TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : - acked(_acked), unacked(_unacked){ - -} - -bool TxAck::prepare(TransactionContext* ctxt) throw(){ - try{ - //dequeue all acked messages from their queues - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) { - i->dequeue(ctxt); - } - } - return true; - }catch(const std::exception& e){ - QPID_LOG(error, "Failed to prepare: " << e.what()); - return false; - }catch(...){ - QPID_LOG(error, "Failed to prepare"); - return false; - } -} - -void TxAck::commit() throw(){ - //remove all acked records from the list - unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); -} - -void TxAck::rollback() throw(){ -} diff --git a/qpid/cpp/src/qpid/broker/TxAck.h b/qpid/cpp/src/qpid/broker/TxAck.h deleted file mode 100644 index c8383b6314..0000000000 --- a/qpid/cpp/src/qpid/broker/TxAck.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TxAck_ -#define _TxAck_ - -#include <algorithm> -#include <functional> -#include <list> -#include "qpid/framing/AccumulatedAck.h" -#include "DeliveryRecord.h" -#include "TxOp.h" - -namespace qpid { - namespace broker { - /** - * Defines the transactional behaviour for acks received by a - * transactional channel. - */ - class TxAck : public TxOp{ - framing::AccumulatedAck& acked; - std::list<DeliveryRecord>& unacked; - - public: - /** - * @param acked a representation of the accumulation of - * acks received - * @param unacked the record of delivered messages - */ - TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - virtual ~TxAck(){} - }; - } -} - - -#endif diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp index 2de2f92e45..ea4e20b529 100644 --- a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp @@ -32,7 +32,6 @@ namespace client { ConnectionSettings::ConnectionSettings() : host("localhost"), port(TcpAddress::DEFAULT_PORT), - clientid("cpp"), username("guest"), password("guest"), mechanism("PLAIN"), diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.h b/qpid/cpp/src/qpid/client/ConnectionSettings.h index bc3d79e1c6..a2b85c5134 100644 --- a/qpid/cpp/src/qpid/client/ConnectionSettings.h +++ b/qpid/cpp/src/qpid/client/ConnectionSettings.h @@ -60,7 +60,6 @@ struct ConnectionSettings : public sys::Socket::Configuration { */ std::string virtualhost; - std::string clientid; /** * The username to use when authenticating the connection. */ diff --git a/qpid/cpp/src/tests/ConnectionOptions.h b/qpid/cpp/src/tests/ConnectionOptions.h index b4b562fbb9..4e0af7352f 100644 --- a/qpid/cpp/src/tests/ConnectionOptions.h +++ b/qpid/cpp/src/tests/ConnectionOptions.h @@ -38,7 +38,6 @@ struct ConnectionOptions : public qpid::Options, ("broker,b", optValue(host, "HOST"), "Broker host to connect to") ("port,p", optValue(port, "PORT"), "Broker port to connect to") ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host") - ("clientname,n", optValue(clientid, "ID"), "unique client identifier") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "PASSWORD"), "password for broker log in.") ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 0e3cba30bb..735d85b419 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -59,7 +59,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ SequenceNumberTest.cpp \ TimerTest.cpp \ TopicExchangeTest.cpp \ - TxAckTest.cpp \ TxBufferTest.cpp \ TxPublishTest.cpp \ MessageBuilderTest.cpp \ diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp deleted file mode 100644 index d330942ced..0000000000 --- a/qpid/cpp/src/tests/TxAckTest.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MessageUtils.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/RecoveryManager.h" -#include "qpid/broker/TxAck.h" -#include "TestMessageStore.h" -#include "unit_test.h" -#include <iostream> -#include <list> -#include <vector> - -using std::list; -using std::vector; -using boost::intrusive_ptr; -using namespace qpid; -using namespace qpid::broker; -using namespace qpid::framing; - -QPID_AUTO_TEST_SUITE(TxAckTestSuite) - -struct TxAckTest -{ - AccumulatedAck acked; - TestMessageStore store; - Queue::shared_ptr queue; - vector<intrusive_ptr<Message> > messages; - list<DeliveryRecord> deliveries; - TxAck op; - - TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) - { - for(int i = 0; i < 10; i++){ - intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", "routing_key")); - msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); - messages.push_back(msg); - QueuedMessage qm(queue.get()); - qm.payload = msg; - deliveries.push_back(DeliveryRecord(qm, queue, "xyz", DeliveryToken::shared_ptr(), (i+1), true)); - } - - //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) - acked.mark = 5; - acked.update(7, 7); - acked.update(9, 9); - } - -}; - -QPID_AUTO_TEST_CASE(testPrepare) -{ - TxAckTest t; - - //ensure acked messages are discarded, i.e. dequeued from store - t.op.prepare(0); - BOOST_CHECK_EQUAL((size_t) 7, t.store.dequeued.size()); - BOOST_CHECK_EQUAL((size_t) 10, t.deliveries.size()); - int dequeued[] = {0, 1, 2, 3, 4, 6, 8}; - for (int i = 0; i < 7; i++) { - BOOST_CHECK_EQUAL(static_pointer_cast<PersistableMessage>(t.messages[dequeued[i]]), t.store.dequeued[i]); - } -} - -QPID_AUTO_TEST_CASE(testCommit) -{ - TxAckTest t; - - //ensure acked messages are removed from list - t.op.commit(); - BOOST_CHECK_EQUAL((size_t) 3, t.deliveries.size()); - list<DeliveryRecord>::iterator i = t.deliveries.begin(); - BOOST_CHECK(i->matches(6));//msg 6 - BOOST_CHECK((++i)->matches(8));//msg 8 - BOOST_CHECK((++i)->matches(10));//msg 10 -} - -QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index e0e947eb74..23e9e565e0 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -95,8 +95,9 @@ struct Opts : public TestOptions { size_t iterations; Mode mode; bool summary; - uint32_t intervalSub; - uint32_t intervalPub; + uint32_t intervalSub; + uint32_t intervalPub; + size_t tx; static const std::string helpText; @@ -106,7 +107,7 @@ struct Opts : public TestOptions { pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), subs(1), ack(0), qt(1), iterations(1), mode(SHARED), summary(false), - intervalSub(0), intervalPub(0) + intervalSub(0), intervalPub(0), tx(0) { addOptions() ("setup", optValue(setup), "Create shared queues.") @@ -140,7 +141,9 @@ struct Opts : public TestOptions { ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") - ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish"); + ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") + + ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size"); } // Computed values @@ -450,6 +453,7 @@ struct PublishThread : public Client { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + if (opts.tx) sync(session).txSelect(); SubscriptionManager subs(session); LocalQueue lq; subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); @@ -474,6 +478,7 @@ struct PublishThread : public Client { arg::content=msg, arg::acceptMode=1); } + if (opts.tx && (j % opts.tx == 0)) sync(session).txCommit(); if (opts.intervalPub) ::usleep(opts.intervalPub*1000); } if (opts.confirm) session.sync(); @@ -483,6 +488,7 @@ struct PublishThread : public Client { // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); session.messageTransfer(arg::content=report, arg::acceptMode=1); + if (opts.tx) sync(session).txCommit(); } session.close(); } @@ -523,16 +529,17 @@ struct SubscribeThread : public Client { } void run() { // Subscribe - try { + try { + if (opts.tx) sync(session).txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.ack)); - subs.setAcceptMode(opts.ack > 0 ? 0 : 1); + LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack)); + subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); - + if (opts.tx) sync(session).txCommit(); for (size_t j = 0; j < opts.iterations; ++j) { if (j > 0) { @@ -544,6 +551,7 @@ struct SubscribeThread : public Client { size_t expect=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); + if (opts.tx && (i % opts.tx == 0)) sync(session).txCommit(); if (opts.intervalSub) ::usleep(opts.intervalSub*1000); // TODO aconway 2007-11-23: check message order for. // multiple publishers. Need an array of counters, @@ -560,14 +568,17 @@ struct SubscribeThread : public Client { expect = n+1; } } - if (opts.ack !=0) + if (opts.ack) subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. + if (opts.tx) + sync(session).txCommit(); AbsTime end=now(); // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); session.messageTransfer(arg::content=result, arg::acceptMode=1); + if (opts.tx) sync(session).txCommit(); } session.close(); } |