summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--qpid/cpp/src/qpid/broker/DtxAck.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/DtxAck.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp63
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h11
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/TxAccept.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/TxAccept.h6
-rw-r--r--qpid/cpp/src/qpid/broker/TxAck.cpp59
-rw-r--r--qpid/cpp/src/qpid/broker/TxAck.h57
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionSettings.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionSettings.h1
-rw-r--r--qpid/cpp/src/tests/ConnectionOptions.h1
-rw-r--r--qpid/cpp/src/tests/Makefile.am1
-rw-r--r--qpid/cpp/src/tests/TxAckTest.cpp95
-rw-r--r--qpid/cpp/src/tests/perftest.cpp29
18 files changed, 42 insertions, 307 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 9891f225cb..d498b1f60e 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -294,7 +294,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAccept.cpp \
- qpid/broker/TxAck.cpp \
qpid/broker/TxBuffer.cpp \
qpid/broker/TxPublish.cpp \
qpid/broker/Vhost.cpp \
@@ -429,7 +428,6 @@ nobase_include_HEADERS = \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
qpid/broker/TxAccept.h \
- qpid/broker/TxAck.h \
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
qpid/broker/TxPublish.h \
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 4406eccc44..530dca99a4 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -83,8 +83,8 @@ bool DeliveryRecord::after(DeliveryId tag) const{
return id > tag;
}
-bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{
- return range->covers(id);
+bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{
+ return range->contains(id);
}
void DeliveryRecord::redeliver(SemanticState* const session) {
@@ -118,6 +118,8 @@ void DeliveryRecord::release(bool setRedelivered)
queue->requeue(msg);
acquired = false;
setEnded();
+ } else {
+ QPID_LOG(debug, "Ignoring release for " << id << " acquired=" << acquired << ", ended =" << ended);
}
}
@@ -130,6 +132,7 @@ void DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
queue->dequeue(ctxt, msg.payload);
setEnded();
+ QPID_LOG(debug, "Accepted " << id);
}
}
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 7d08a4b1f0..78dc99e3c6 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -25,7 +25,7 @@
#include <list>
#include <vector>
#include <ostream>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "Queue.h"
#include "Consumer.h"
#include "DeliveryId.h"
@@ -63,7 +63,7 @@ class DeliveryRecord{
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
- bool coveredBy(const framing::AccumulatedAck* const range) const;
+ bool coveredBy(const framing::SequenceSet* const range) const;
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
diff --git a/qpid/cpp/src/qpid/broker/DtxAck.cpp b/qpid/cpp/src/qpid/broker/DtxAck.cpp
index 0e6f94d4e0..47637369ca 100644
--- a/qpid/cpp/src/qpid/broker/DtxAck.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxAck.cpp
@@ -26,7 +26,7 @@ using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
-DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked)
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
diff --git a/qpid/cpp/src/qpid/broker/DtxAck.h b/qpid/cpp/src/qpid/broker/DtxAck.h
index c61b279c42..05c4499839 100644
--- a/qpid/cpp/src/qpid/broker/DtxAck.h
+++ b/qpid/cpp/src/qpid/broker/DtxAck.h
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -34,7 +34,7 @@ namespace qpid {
std::list<DeliveryRecord> pending;
public:
- DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index ad617c1bc1..bdd8edac87 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -28,7 +28,6 @@
#include "Queue.h"
#include "SessionContext.h"
#include "TxAccept.h"
-#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -63,7 +62,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
prefetchCount(0),
tagGenerator("sgen"),
dtxSelected(false),
- accumulatedAck(0),
flowActive(true),
outputTasks(ss)
{
@@ -116,14 +114,12 @@ void SemanticState::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void SemanticState::commit(MessageStore* const store, bool completeOnCommit)
+void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
- TxOp::shared_ptr txAck(completeOnCommit ?
- static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) :
- static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
+ TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
accumulatedAck.clear();
@@ -377,59 +373,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
}
-void SemanticState::ackCumulative(DeliveryId id)
-{
- ack(id, id, true);
-}
-
-void SemanticState::ackRange(DeliveryId first, DeliveryId last)
-{
- ack(first, last, false);
-}
-
-void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
-{
- {
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last));
- } else if (start != unacked.end()) {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
-
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, copy the relevant slice from
- //unacked and record it against that transaction:
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- //then remove that slice from the unacked record:
- unacked.remove_if(boost::bind(&DeliveryRecord::coveredBy, _1, &accumulatedAck));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- }
- } else {
- for_each(start, end, boost::bind(&DeliveryRecord::dequeue, _1, (TransactionContext*) 0));
- unacked.erase(start, end);
- }
- }//end of lock scope for delivery lock (TODO this is ugly, make it prettier)
-
- //if the prefetch limit had previously been reached, or credit
- //had expired in windowing mode there may be messages that can
- //be now be delivered
- requestDispatch();
-}
-
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
@@ -667,7 +610,7 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
- accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
+ accumulatedAck.add(first, last);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 84dc0fc5bb..bf3a7756b5 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -34,7 +34,7 @@
#include "TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/shared_ptr.h"
@@ -115,7 +115,7 @@ class SemanticState : public framing::FrameHandler::Chains,
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
DtxBufferMap suspendedXids;
- framing::AccumulatedAck accumulatedAck;
+ framing::SequenceSet accumulatedAck;
bool flowActive;
boost::shared_ptr<Exchange> cacheExchange;
sys::AggregateOutput outputTasks;
@@ -125,7 +125,6 @@ class SemanticState : public framing::FrameHandler::Chains,
bool checkPrefetch(boost::intrusive_ptr<Message>& msg);
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
- void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
@@ -168,7 +167,7 @@ class SemanticState : public framing::FrameHandler::Chains,
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void startTx();
- void commit(MessageStore* const store, bool completeOnCommit);
+ void commit(MessageStore* const store);
void rollback();
void selectDtx();
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
@@ -184,10 +183,6 @@ class SemanticState : public framing::FrameHandler::Chains,
void handle(boost::intrusive_ptr<Message> msg);
bool doOutput() { return outputTasks.doOutput(); }
- //preview only (completed == ack):
- void ackCumulative(DeliveryId deliveryTag);
- void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
-
//final 0-10 spec (completed and accepted are distinct):
void completed(DeliveryId deliveryTag, DeliveryId endTag);
void accepted(DeliveryId deliveryTag, DeliveryId endTag);
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 3d795014b8..e1589aea99 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -464,7 +464,7 @@ void SessionAdapter::TxHandlerImpl::select()
void SessionAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore(), false);
+ state.commit(&getBroker().getStore());
}
void SessionAdapter::TxHandlerImpl::rollback()
diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp
index 634d066ecc..82acf61cd1 100644
--- a/qpid/cpp/src/qpid/broker/TxAccept.cpp
+++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp
@@ -25,9 +25,9 @@ using std::bind1st;
using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
-using qpid::framing::AccumulatedAck;
+using qpid::framing::SequenceSet;
-TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
+TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
acked(_acked), unacked(_unacked) {}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
diff --git a/qpid/cpp/src/qpid/broker/TxAccept.h b/qpid/cpp/src/qpid/broker/TxAccept.h
index 011acf5d9e..9548c50c2a 100644
--- a/qpid/cpp/src/qpid/broker/TxAccept.h
+++ b/qpid/cpp/src/qpid/broker/TxAccept.h
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -35,7 +35,7 @@ namespace qpid {
* a transactional channel.
*/
class TxAccept : public TxOp{
- framing::AccumulatedAck& acked;
+ framing::SequenceSet& acked;
std::list<DeliveryRecord>& unacked;
public:
@@ -44,7 +44,7 @@ namespace qpid {
* acks received
* @param unacked the record of delivered messages
*/
- TxAccept(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/qpid/cpp/src/qpid/broker/TxAck.cpp b/qpid/cpp/src/qpid/broker/TxAck.cpp
deleted file mode 100644
index 40b9b0ff33..0000000000
--- a/qpid/cpp/src/qpid/broker/TxAck.cpp
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "TxAck.h"
-#include "qpid/log/Statement.h"
-
-using std::bind1st;
-using std::bind2nd;
-using std::mem_fun_ref;
-using namespace qpid::broker;
-using qpid::framing::AccumulatedAck;
-
-TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
- acked(_acked), unacked(_unacked){
-
-}
-
-bool TxAck::prepare(TransactionContext* ctxt) throw(){
- try{
- //dequeue all acked messages from their queues
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) {
- i->dequeue(ctxt);
- }
- }
- return true;
- }catch(const std::exception& e){
- QPID_LOG(error, "Failed to prepare: " << e.what());
- return false;
- }catch(...){
- QPID_LOG(error, "Failed to prepare");
- return false;
- }
-}
-
-void TxAck::commit() throw(){
- //remove all acked records from the list
- unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
-}
-
-void TxAck::rollback() throw(){
-}
diff --git a/qpid/cpp/src/qpid/broker/TxAck.h b/qpid/cpp/src/qpid/broker/TxAck.h
deleted file mode 100644
index c8383b6314..0000000000
--- a/qpid/cpp/src/qpid/broker/TxAck.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxAck_
-#define _TxAck_
-
-#include <algorithm>
-#include <functional>
-#include <list>
-#include "qpid/framing/AccumulatedAck.h"
-#include "DeliveryRecord.h"
-#include "TxOp.h"
-
-namespace qpid {
- namespace broker {
- /**
- * Defines the transactional behaviour for acks received by a
- * transactional channel.
- */
- class TxAck : public TxOp{
- framing::AccumulatedAck& acked;
- std::list<DeliveryRecord>& unacked;
-
- public:
- /**
- * @param acked a representation of the accumulation of
- * acks received
- * @param unacked the record of delivered messages
- */
- TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~TxAck(){}
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
index 2de2f92e45..ea4e20b529 100644
--- a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
@@ -32,7 +32,6 @@ namespace client {
ConnectionSettings::ConnectionSettings() :
host("localhost"),
port(TcpAddress::DEFAULT_PORT),
- clientid("cpp"),
username("guest"),
password("guest"),
mechanism("PLAIN"),
diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.h b/qpid/cpp/src/qpid/client/ConnectionSettings.h
index bc3d79e1c6..a2b85c5134 100644
--- a/qpid/cpp/src/qpid/client/ConnectionSettings.h
+++ b/qpid/cpp/src/qpid/client/ConnectionSettings.h
@@ -60,7 +60,6 @@ struct ConnectionSettings : public sys::Socket::Configuration {
*/
std::string virtualhost;
- std::string clientid;
/**
* The username to use when authenticating the connection.
*/
diff --git a/qpid/cpp/src/tests/ConnectionOptions.h b/qpid/cpp/src/tests/ConnectionOptions.h
index b4b562fbb9..4e0af7352f 100644
--- a/qpid/cpp/src/tests/ConnectionOptions.h
+++ b/qpid/cpp/src/tests/ConnectionOptions.h
@@ -38,7 +38,6 @@ struct ConnectionOptions : public qpid::Options,
("broker,b", optValue(host, "HOST"), "Broker host to connect to")
("port,p", optValue(port, "PORT"), "Broker port to connect to")
("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
- ("clientname,n", optValue(clientid, "ID"), "unique client identifier")
("username", optValue(username, "USER"), "user name for broker log in.")
("password", optValue(password, "PASSWORD"), "password for broker log in.")
("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.")
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 0e3cba30bb..735d85b419 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -59,7 +59,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
SequenceNumberTest.cpp \
TimerTest.cpp \
TopicExchangeTest.cpp \
- TxAckTest.cpp \
TxBufferTest.cpp \
TxPublishTest.cpp \
MessageBuilderTest.cpp \
diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp
deleted file mode 100644
index d330942ced..0000000000
--- a/qpid/cpp/src/tests/TxAckTest.cpp
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "MessageUtils.h"
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid/broker/RecoveryManager.h"
-#include "qpid/broker/TxAck.h"
-#include "TestMessageStore.h"
-#include "unit_test.h"
-#include <iostream>
-#include <list>
-#include <vector>
-
-using std::list;
-using std::vector;
-using boost::intrusive_ptr;
-using namespace qpid;
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-QPID_AUTO_TEST_SUITE(TxAckTestSuite)
-
-struct TxAckTest
-{
- AccumulatedAck acked;
- TestMessageStore store;
- Queue::shared_ptr queue;
- vector<intrusive_ptr<Message> > messages;
- list<DeliveryRecord> deliveries;
- TxAck op;
-
- TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
- {
- for(int i = 0; i < 10; i++){
- intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", "routing_key"));
- msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
- messages.push_back(msg);
- QueuedMessage qm(queue.get());
- qm.payload = msg;
- deliveries.push_back(DeliveryRecord(qm, queue, "xyz", DeliveryToken::shared_ptr(), (i+1), true));
- }
-
- //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
- acked.mark = 5;
- acked.update(7, 7);
- acked.update(9, 9);
- }
-
-};
-
-QPID_AUTO_TEST_CASE(testPrepare)
-{
- TxAckTest t;
-
- //ensure acked messages are discarded, i.e. dequeued from store
- t.op.prepare(0);
- BOOST_CHECK_EQUAL((size_t) 7, t.store.dequeued.size());
- BOOST_CHECK_EQUAL((size_t) 10, t.deliveries.size());
- int dequeued[] = {0, 1, 2, 3, 4, 6, 8};
- for (int i = 0; i < 7; i++) {
- BOOST_CHECK_EQUAL(static_pointer_cast<PersistableMessage>(t.messages[dequeued[i]]), t.store.dequeued[i]);
- }
-}
-
-QPID_AUTO_TEST_CASE(testCommit)
-{
- TxAckTest t;
-
- //ensure acked messages are removed from list
- t.op.commit();
- BOOST_CHECK_EQUAL((size_t) 3, t.deliveries.size());
- list<DeliveryRecord>::iterator i = t.deliveries.begin();
- BOOST_CHECK(i->matches(6));//msg 6
- BOOST_CHECK((++i)->matches(8));//msg 8
- BOOST_CHECK((++i)->matches(10));//msg 10
-}
-
-QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp
index e0e947eb74..23e9e565e0 100644
--- a/qpid/cpp/src/tests/perftest.cpp
+++ b/qpid/cpp/src/tests/perftest.cpp
@@ -95,8 +95,9 @@ struct Opts : public TestOptions {
size_t iterations;
Mode mode;
bool summary;
- uint32_t intervalSub;
- uint32_t intervalPub;
+ uint32_t intervalSub;
+ uint32_t intervalPub;
+ size_t tx;
static const std::string helpText;
@@ -106,7 +107,7 @@ struct Opts : public TestOptions {
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
subs(1), ack(0),
qt(1), iterations(1), mode(SHARED), summary(false),
- intervalSub(0), intervalPub(0)
+ intervalSub(0), intervalPub(0), tx(0)
{
addOptions()
("setup", optValue(setup), "Create shared queues.")
@@ -140,7 +141,9 @@ struct Opts : public TestOptions {
("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)")
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
- ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish");
+ ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish")
+
+ ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size");
}
// Computed values
@@ -450,6 +453,7 @@ struct PublishThread : public Client {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ if (opts.tx) sync(session).txSelect();
SubscriptionManager subs(session);
LocalQueue lq;
subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
@@ -474,6 +478,7 @@ struct PublishThread : public Client {
arg::content=msg,
arg::acceptMode=1);
}
+ if (opts.tx && (j % opts.tx == 0)) sync(session).txCommit();
if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
}
if (opts.confirm) session.sync();
@@ -483,6 +488,7 @@ struct PublishThread : public Client {
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time), "pub_done");
session.messageTransfer(arg::content=report, arg::acceptMode=1);
+ if (opts.tx) sync(session).txCommit();
}
session.close();
}
@@ -523,16 +529,17 @@ struct SubscribeThread : public Client {
}
void run() { // Subscribe
- try {
+ try {
+ if (opts.tx) sync(session).txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.ack));
- subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
+ LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack));
+ subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1);
subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
false);
subs.subscribe(lq, queue);
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
-
+ if (opts.tx) sync(session).txCommit();
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
@@ -544,6 +551,7 @@ struct SubscribeThread : public Client {
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
+ if (opts.tx && (i % opts.tx == 0)) sync(session).txCommit();
if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
// multiple publishers. Need an array of counters,
@@ -560,14 +568,17 @@ struct SubscribeThread : public Client {
expect = n+1;
}
}
- if (opts.ack !=0)
+ if (opts.ack)
subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+ if (opts.tx)
+ sync(session).txCommit();
AbsTime end=now();
// Report to publisher.
Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
"sub_done");
session.messageTransfer(arg::content=result, arg::acceptMode=1);
+ if (opts.tx) sync(session).txCommit();
}
session.close();
}