diff options
author | Gordon Sim <gsim@apache.org> | 2008-10-24 12:34:29 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-10-24 12:34:29 +0000 |
commit | 8aab8c9e44cdf21762aef1e20fb29811f2272d19 (patch) | |
tree | 17fcab8698b5bb90e6942619b683015c8a7c2511 /qpid/cpp/src | |
parent | 7e04e141dfb61fd0477c06a0e77d3c7e7be05761 (diff) | |
download | qpid-python-8aab8c9e44cdf21762aef1e20fb29811f2272d19.tar.gz |
Revised transactional options to perftest as they could not be used on older boost versions due to ambiguity.
Refactored TxAccept to avoid excessive testing and searching for delivery records.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.h | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TxAccept.cpp | 61 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TxAccept.h | 24 | ||||
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 74 |
6 files changed, 151 insertions, 70 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 65c1f0a1fa..31ccdd8260 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -171,6 +171,24 @@ void DeliveryRecord::cancel(const std::string& cancelledTag) cancelled = true; } +AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last) +{ + ack_iterator start = find_if(records.begin(), records.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first)); + ack_iterator end = start; + + if (start != records.end()) { + if (first == last) { + //just acked single element (move end past it) + ++end; + } else { + //need to find end (position it just after the last record in range) + end = find_if(start, records.end(), boost::bind(&DeliveryRecord::after, _1, last)); + } + } + return AckRange(start, end); +} + + namespace qpid { namespace broker { diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index 6be6a9249a..f6ffb64697 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -34,11 +34,24 @@ namespace qpid { namespace broker { class SemanticState; +class DeliveryRecord; + +typedef std::list<DeliveryRecord> DeliveryRecords; +typedef std::list<DeliveryRecord>::iterator ack_iterator; + +struct AckRange +{ + ack_iterator start; + ack_iterator end; + AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {} +}; + /** * Record of a delivery for which an ack is outstanding. */ -class DeliveryRecord{ +class DeliveryRecord +{ QueuedMessage msg; mutable Queue::shared_ptr queue; const std::string tag; @@ -91,24 +104,14 @@ class DeliveryRecord{ void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize); void setId(DeliveryId _id) { id = _id; } + static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last); const QueuedMessage& getMessage() const { return msg; } framing::SequenceNumber getId() const { return id; } Queue::shared_ptr getQueue() const { return queue; } - friend bool operator<(const DeliveryRecord&, const DeliveryRecord&); friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; -typedef std::list<DeliveryRecord> DeliveryRecords; -typedef std::list<DeliveryRecord>::iterator ack_iterator; - -struct AckRange -{ - ack_iterator start; - ack_iterator end; - AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {} -}; - struct AcquireFunctor { DeliveryIds& results; @@ -120,7 +123,6 @@ struct AcquireFunctor record.acquire(results); } }; - } } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 42ef8030a6..76eacb8808 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -549,20 +549,8 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ - ack_iterator start = find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first)); - ack_iterator end = start; - - if (start != unacked.end()) { - if (first == last) { - //just acked single element (move end past it) - ++end; - } else { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last)); - } - } - return AckRange(start, end); +{ + return DeliveryRecord::findRange(unacked, first, last); } void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired) diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp index 82acf61cd1..6d307bf735 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.cpp +++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp @@ -26,19 +26,60 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; using qpid::framing::SequenceSet; +using qpid::framing::SequenceNumber; + +TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {} + +void TxAccept::RangeOp::prepare(TransactionContext* ctxt) +{ + for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt)); +} + +void TxAccept::RangeOp::commit() +{ + for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1)); +} + +TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {} + +void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end) +{ + ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end))); +} + +void TxAccept::RangeOps::prepare(TransactionContext* ctxt) +{ + for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt)); +} + +void TxAccept::RangeOps::commit() +{ + for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1)); + //now remove if isRedundant(): + if (!ranges.empty()) { + ack_iterator i = ranges.front().range.start; + ack_iterator end = ranges.back().range.end; + while (i != end) { + if (i->isRedundant()) { + i = unacked.erase(i); + } else { + i++; + } + } + } +} TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : - acked(_acked), unacked(_unacked) {} + acked(_acked), unacked(_unacked), ops(unacked) +{ + //populate the ops + acked.for_each(ops); +} bool TxAccept::prepare(TransactionContext* ctxt) throw() { try{ - //dequeue messages from their respective queues: - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) { - i->dequeue(ctxt); - } - } + ops.prepare(ctxt); return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -51,11 +92,7 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw() void TxAccept::commit() throw() { - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) i->setEnded(); - } - - unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + ops.commit(); } void TxAccept::rollback() throw() {} diff --git a/qpid/cpp/src/qpid/broker/TxAccept.h b/qpid/cpp/src/qpid/broker/TxAccept.h index 9548c50c2a..5474327f7c 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.h +++ b/qpid/cpp/src/qpid/broker/TxAccept.h @@ -34,9 +34,31 @@ namespace qpid { * Defines the transactional behaviour for accepts received by * a transactional channel. */ - class TxAccept : public TxOp{ + class TxAccept : public TxOp { + struct RangeOp + { + AckRange range; + + RangeOp(const AckRange& r); + void prepare(TransactionContext* ctxt); + void commit(); + }; + + struct RangeOps + { + std::vector<RangeOp> ranges; + DeliveryRecords& unacked; + + RangeOps(DeliveryRecords& u); + + void operator()(framing::SequenceNumber start, framing::SequenceNumber end); + void prepare(TransactionContext* ctxt); + void commit(); + }; + framing::SequenceSet& acked; std::list<DeliveryRecord>& unacked; + RangeOps ops; public: /** diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index 923405779c..2e15489525 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -97,9 +97,10 @@ struct Opts : public TestOptions { bool summary; uint32_t intervalSub; uint32_t intervalPub; - size_t tx_pub; - bool tx_pub_async; - size_t tx_sub; + size_t tx; + size_t txPub; + size_t txSub; + bool commitAsync; static const std::string helpText; @@ -109,7 +110,7 @@ struct Opts : public TestOptions { pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), subs(1), ack(0), qt(1), iterations(1), mode(SHARED), summary(false), - intervalSub(0), intervalPub(0), tx_pub(0), tx_pub_async(false), tx_sub(0) + intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false) { addOptions() ("setup", optValue(setup), "Create shared queues.") @@ -145,9 +146,10 @@ struct Opts : public TestOptions { ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") - ("tx_pub", optValue(tx_pub, "N"), "if non-zero, the transaction batch size for publishing") - ("tx_pub_async", optValue(tx_pub_async, "yes|no"), "Publishing tx commit async") - ("tx_sub", optValue(tx_sub, "N"), "if non-zero, the transaction batch size for consuming"); + ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming") + ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing") + ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit") + ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming"); } // Computed values @@ -184,6 +186,18 @@ struct Opts : public TestOptions { break; } transfers=(totalPubs*count) + (totalSubs*subQuota); + if (tx) { + if (txPub) { + cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl; + } else { + txPub = tx; + } + if (txSub) { + cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl; + } else { + txSub = tx; + } + } } }; @@ -457,12 +471,8 @@ struct PublishThread : public Client { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - if (opts.tx_pub){ - if (opts.tx_pub_async){ - session.txSelect(); - } else { - sync(session).txSelect(); - } + if (opts.txPub){ + session.txSelect(); } SubscriptionManager subs(session); LocalQueue lq; @@ -488,8 +498,8 @@ struct PublishThread : public Client { arg::content=msg, arg::acceptMode=1); } - if (opts.tx_pub && ((i+1) % opts.tx_pub == 0)){ - if (opts.tx_pub_async){ + if (opts.txPub && ((i+1) % opts.txPub == 0)){ + if (opts.commitAsync){ session.txCommit(); } else { sync(session).txCommit(); @@ -504,12 +514,8 @@ struct PublishThread : public Client { // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); session.messageTransfer(arg::content=report, arg::acceptMode=1); - if (opts.tx_pub){ - if (opts.tx_pub_async){ - session.txCommit(); - }else{ - sync(session).txCommit(); - } + if (opts.txPub){ + sync(session).txCommit(); } } session.close(); @@ -552,16 +558,19 @@ struct SubscribeThread : public Client { void run() { // Subscribe try { - if (opts.tx_sub) sync(session).txSelect(); + if (opts.txSub) sync(session).txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.tx_sub ? opts.tx_sub : opts.ack)); - subs.setAcceptMode(opts.tx_sub || opts.ack ? 0 : 1); + LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack)); + subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); - if (opts.tx_sub) sync(session).txCommit(); + if (opts.txSub) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } for (size_t j = 0; j < opts.iterations; ++j) { if (j > 0) { @@ -573,7 +582,10 @@ struct SubscribeThread : public Client { size_t expect=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); - if (opts.tx_sub && ((i+1) % opts.tx_sub == 0)) sync(session).txCommit(); + if (opts.txSub && ((i+1) % opts.txSub == 0)) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } if (opts.intervalSub) ::usleep(opts.intervalSub*1000); // TODO aconway 2007-11-23: check message order for. // multiple publishers. Need an array of counters, @@ -590,17 +602,19 @@ struct SubscribeThread : public Client { expect = n+1; } } - if (opts.tx_sub || opts.ack) + if (opts.txSub || opts.ack) lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. - if (opts.tx_sub) - sync(session).txCommit(); + if (opts.txSub) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } AbsTime end=now(); // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); session.messageTransfer(arg::content=result, arg::acceptMode=1); - if (opts.tx_sub) sync(session).txCommit(); + if (opts.txSub) sync(session).txCommit(); } session.close(); } |