diff options
author | Gordon Sim <gsim@apache.org> | 2008-10-29 15:15:08 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-10-29 15:15:08 +0000 |
commit | 20e253844fddd084eac9b80dc9cc73efff12dd28 (patch) | |
tree | 7772f7360ec0bc8c7cabc49202d7b6bca038e7ec /cpp | |
parent | 2dcb03fba0f117583a3dd669c46302bb6bb834a2 (diff) | |
download | qpid-python-20e253844fddd084eac9b80dc9cc73efff12dd28.tar.gz |
* added flag to SubscriptionSettings to control automatic completion of message
* removed automatic acquiring under autoAck mode
* added test for results from acquire requests
* added short txtest to the set of system tests run under make check
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708919 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/client/FlowControl.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionSettings.h | 31 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 3 | ||||
-rwxr-xr-x | cpp/src/tests/quick_txtest | 2 | ||||
-rw-r--r-- | cpp/src/tests/txtest.cpp | 15 |
6 files changed, 50 insertions, 30 deletions
diff --git a/cpp/src/qpid/client/FlowControl.h b/cpp/src/qpid/client/FlowControl.h index 0f5f8596ec..d2205aaa78 100644 --- a/cpp/src/qpid/client/FlowControl.h +++ b/cpp/src/qpid/client/FlowControl.h @@ -42,8 +42,9 @@ namespace client { * is renewed. * * In "window mode" credit is automatically renewed when a message is - * accepted. In non-window mode credit is not automatically renewed, - * it must be explicitly re-set (@see Subscription) + * completed (which by default happens when it is accepted). In + * non-window mode credit is not automatically renewed, it must be + * explicitly re-set (@see Subscription) */ struct FlowControl { static const uint32_t UNLIMITED=0xFFFFFFFF; diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp index 3363dda11f..684cca031a 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.cpp +++ b/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -27,6 +27,7 @@ namespace qpid { namespace client { using sys::Mutex; +using framing::MessageAcquireResult; SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l) : manager(m), name(n), queue(q), settings(s), listener(l) @@ -68,16 +69,19 @@ SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock); void SubscriptionImpl::acquire(const SequenceSet& messageIds) { Mutex::ScopedLock l(lock); - manager.getSession().messageAcquire(messageIds); - unacquired.remove(messageIds); + MessageAcquireResult result = manager.getSession().messageAcquire(messageIds); + unacquired.remove(result.getTransfers()); if (settings.acceptMode == ACCEPT_MODE_EXPLICIT) - unaccepted.add(messageIds); + unaccepted.add(result.getTransfers()); } void SubscriptionImpl::accept(const SequenceSet& messageIds) { Mutex::ScopedLock l(lock); manager.getSession().messageAccept(messageIds); unaccepted.remove(messageIds); + if (settings.autoComplete) { + manager.getSession().sendCompletion(); + } } Session SubscriptionImpl::getSession() const { return manager.getSession(); } @@ -88,7 +92,6 @@ void SubscriptionImpl::cancel() { manager.cancel(name); } void SubscriptionImpl::received(Message& m) { Mutex::ScopedLock l(lock); - manager.getSession().markCompleted(m.getId(), false, false); if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) unacquired.add(m.getId()); else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT) @@ -99,15 +102,16 @@ void SubscriptionImpl::received(Message& m) { listener->received(m); } + if (settings.autoComplete) { + manager.getSession().markCompleted(m.getId(), false, false); + } if (settings.autoAck) { - if (unacquired.size() + unaccepted.size() >= settings.autoAck) { - if (unacquired.size()) { - async(manager.getSession()).messageAcquire(unacquired); - unaccepted.add(unacquired); - unaccepted.clear(); - } + if (unaccepted.size() >= settings.autoAck) { async(manager.getSession()).messageAccept(unaccepted); unaccepted.clear(); + if (settings.autoComplete) { + manager.getSession().sendCompletion(); + } } } } diff --git a/cpp/src/qpid/client/SubscriptionSettings.h b/cpp/src/qpid/client/SubscriptionSettings.h index 924814c809..19fbc3486b 100644 --- a/cpp/src/qpid/client/SubscriptionSettings.h +++ b/cpp/src/qpid/client/SubscriptionSettings.h @@ -39,22 +39,33 @@ struct SubscriptionSettings FlowControl flow=FlowControl::unlimited(), AcceptMode accept=ACCEPT_MODE_EXPLICIT, AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED, - unsigned int autoAck_=1 - ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_) {} + unsigned int autoAck_=1, + bool autoComplete_=true + ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), autoComplete(autoComplete_) {} FlowControl flowControl; ///@< Flow control settings. @see FlowControl AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE AcquireMode acquireMode; ///@< ACQUIRE_MODE_PRE_ACQUIRED or ACQUIRE_MODE_NOT_ACQUIRED - /** Automatically acknowledge (acquire and accept) batches of autoAck messages. - * 0 means no automatic acknowledgement. What it means to "acknowledge" a message depends on - * acceptMode and acquireMode: - * - ACCEPT_MODE_NONE and ACQUIRE_MODE_PRE_ACQUIRED: do nothing - * - ACCEPT_MODE_NONE and ACQUIRE_MODE_NOT_ACQUIRED: send an "acquire" command - * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED: send "accept" command - * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_NOT_ACQUIRED: send "acquire" and "accept" commands - */ + /** Automatically acknowledge (accept) batches of autoAck + * messages. 0 means no automatic acknowledgement. This has no + * effect for messsages received for a subscription with + * ACCEPT_MODE_NODE.*/ unsigned int autoAck; + /** + * If set to true, messages will be marked as completed (in + * windowing mode, completion of a message will cause the credit + * used up by that message to be reallocated) once they have been + * received. The server will be explicitly notified of all + * completed messages when the next accept is sent through the + * subscription (either explictly or through autAck). However the + * server may also periodically request information on the + * completed messages. + * + * If set to false the application is responsible for completing + * messages (@see Session::markCompleted()). + */ + bool autoComplete; }; }} // namespace qpid::client diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 0f562cec16..d8300f46d3 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -134,7 +134,7 @@ header_test_LDADD=$(lib_client) TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test -system_tests = client_test quick_perftest quick_topictest run_header_test +system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests EXTRA_DIST += \ @@ -142,6 +142,7 @@ EXTRA_DIST += \ run-unit-tests start_broker python_tests stop_broker \ quick_topictest \ quick_perftest \ + quick_txtest \ topictest \ run_header_test \ header_test.py \ diff --git a/cpp/src/tests/quick_txtest b/cpp/src/tests/quick_txtest new file mode 100755 index 0000000000..56df55e705 --- /dev/null +++ b/cpp/src/tests/quick_txtest @@ -0,0 +1,2 @@ +#!/bin/sh +exec `dirname $0`/run_test ./txtest --queues 4 --tx-count 10 --quiet diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index a4ee32c38a..a569bdd648 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -51,11 +51,12 @@ struct Args : public qpid::TestOptions { uint txCount; uint totalMsgCount; bool dtx; + bool quiet; Args() : init(true), transfer(true), check(true), size(256), durable(true), queues(2), base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10), - dtx(false) + dtx(false), quiet(false) { addOptions() @@ -69,7 +70,8 @@ struct Args : public qpid::TestOptions { ("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction") ("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'") ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'") - ("dtx", optValue(dtx, "yes|no"), "use distributed transactions"); + ("dtx", optValue(dtx, "yes|no"), "use distributed transactions") + ("quiet", optValue(quiet), "reduce output from test"); } }; @@ -159,7 +161,6 @@ struct Transfer : public Client, public Runnable session.messageTransfer(arg::content=out, arg::acceptMode=1); } sub.accept(sub.getUnaccepted()); - session.sendCompletion(); if (opts.dtx) { session.dtxEnd(arg::xid=xid); session.dtxPrepare(arg::xid=xid); @@ -219,7 +220,7 @@ struct Controller : public Client StringSet::iterator next = i + 1; if (next == queues.end()) next = queues.begin(); - std::cout << "Transfering from " << *i << " to " << *next << std::endl; + if (!opts.quiet) std::cout << "Transfering from " << *i << " to " << *next << std::endl; agents.push_back(new Transfer(*i, *next)); agents.back().thread = Thread(agents.back()); } @@ -241,13 +242,13 @@ struct Controller : public Client xidArr.collect(inDoubtXids); if (inDoubtXids.size()) { - std::cout << "Recovering DTX in-doubt transaction(s):" << std::endl; + if (!opts.quiet) std::cout << "Recovering DTX in-doubt transaction(s):" << std::endl; framing::StructHelper decoder; framing::Xid xid; // abort even, commit odd transactions for (unsigned i = 0; i < inDoubtXids.size(); i++) { decoder.decode(xid, inDoubtXids[i]); - std::cout << (i%2 ? " * aborting " : " * committing "); + if (!opts.quiet) std::cout << (i%2 ? " * aborting " : " * committing "); xid.print(std::cout); std::cout << std::endl; if (i%2) { @@ -276,7 +277,7 @@ struct Controller : public Client drained.push_back(m.getMessageProperties().getCorrelationId()); ++count; } - std::cout << "Drained " << count << " messages from " << *i << std::endl; + if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl; } sort(ids.begin(), ids.end()); |