diff options
author | Alan Conway <aconway@apache.org> | 2014-08-29 19:29:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-08-29 19:29:06 +0000 |
commit | eb6b6828a1e151c6da0a0e43b0ce7118b162ae1f (patch) | |
tree | 881df7cb1d1e7faa9b230ea27bb142e0480325f7 | |
parent | 3fd257aaa7a445db97153acc0feda34148366d23 (diff) | |
download | qpid-python-eb6b6828a1e151c6da0a0e43b0ce7118b162ae1f.tar.gz |
QPID-5855: Fix to JAVA Client Can not recieve message with qpid ha cluster.
The original fix for this introduced a regression, running the qpid-txttest2
test against a cluster with the linear store failed. This fixes the fix.
- Run transaction commit logic when the commit completes. Report completion to the user only when
all prior commands have completed (sync point)
- Fix missing initializer in client/amqp0_10/SessionImpl.cpp for transaction committing flag.
- Remove annoying log messages from IdSetter.h
- Skip transactional messages in prepare, don't wait till commit.
- Added fetch-timeout option to qpid-txtest2
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1621368 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/AsyncCommandCallback.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/ha/IdSetter.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/ha/PrimaryTxObserver.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/qpid-txtest2.cpp | 6 |
5 files changed, 18 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/AsyncCommandCallback.cpp b/cpp/src/qpid/broker/AsyncCommandCallback.cpp index 52dab42948..0b52c6b5c5 100644 --- a/cpp/src/qpid/broker/AsyncCommandCallback.cpp +++ b/cpp/src/qpid/broker/AsyncCommandCallback.cpp @@ -58,9 +58,10 @@ void AsyncCommandCallback::complete() { void AsyncCommandCallback::doCommand() { SessionState* session = completerContext->getSession(); if (session && session->isAttached()) { - // Complete now unless this is a syncPoint and there are incomplete commands. + std::string result = command(); // Execute the command now. + // Send completion now unless this is a syncPoint and there are incomplete commands. if (!(syncPoint && session->addPendingExecutionSync(id))) - session->completeCommand(id, false, requiresSync, command()); + session->completeCommand(id, false, requiresSync, result); } else throw InternalErrorException("Cannot complete command, no session"); diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 9299ed7cb1..32f52adf43 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -59,7 +59,7 @@ typedef qpid::sys::Mutex::ScopedLock ScopedLock; typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : - connection(&c), transactional(t) {} + connection(&c), transactional(t), committing(false) {} bool SessionImpl::isTransactional() const { @@ -101,11 +101,19 @@ void SessionImpl::sync(bool block) else execute<NonBlockingSync>(); } +namespace { +struct ScopedSet { + bool& flag; + ScopedSet(bool& f) : flag(f) { flag = true; } + ~ScopedSet() { flag = false; } +}; +} + void SessionImpl::commit() { try { checkError(); - committing = true; + ScopedSet s(committing); execute<Commit>(); } catch (const TransactionError&) { @@ -114,7 +122,6 @@ void SessionImpl::commit() catch (const std::exception& e) { txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what()); } - committing = false; checkError(); } diff --git a/cpp/src/qpid/ha/IdSetter.h b/cpp/src/qpid/ha/IdSetter.h index f0629c99bb..94bc74668e 100644 --- a/cpp/src/qpid/ha/IdSetter.h +++ b/cpp/src/qpid/ha/IdSetter.h @@ -53,7 +53,6 @@ class IdSetter : public broker::MessageInterceptor // been enqueued or saved in a transaction buffer. This is when we normally want // to assign a replication-id. m.setReplicationId(nextId++); - QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m.getReplicationId())); } void publish(broker::Message& m) { @@ -63,7 +62,6 @@ class IdSetter : public broker::MessageInterceptor // store record() is not called, so set the ID now if not already set. if (!m.hasReplicationId()) { m.setReplicationId(nextId++); - QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m)); } } diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 3c6318fbfe..56815ef89d 100644 --- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -207,6 +207,7 @@ bool PrimaryTxObserver::prepare() { Mutex::ScopedLock l(lock); checkState(SENDING, "Too late for prepare"); state = PREPARING; + skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue. txQueue->deliver(TxPrepareEvent().message()); return true; } @@ -216,7 +217,6 @@ void PrimaryTxObserver::commit() { Mutex::ScopedLock l(lock); checkState(PREPARING, "Cannot commit, not preparing"); if (incomplete.size() == 0) { - skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue. txQueue->deliver(TxCommitEvent().message()); end(l); } else { diff --git a/cpp/src/tests/qpid-txtest2.cpp b/cpp/src/tests/qpid-txtest2.cpp index 6e2f81726e..5e1dd492da 100644 --- a/cpp/src/tests/qpid-txtest2.cpp +++ b/cpp/src/tests/qpid-txtest2.cpp @@ -63,11 +63,12 @@ struct Options : public qpid::Options { qpid::log::Options log; uint port; bool quiet; + double fetchTimeout; Options() : help(false), init(true), transfer(true), check(true), size(256), durable(true), queues(2), base("tx"), msgsPerTx(1), txCount(5), totalMsgCount(10), - capacity(1000), url("localhost"), port(0), quiet(false) + capacity(1000), url("localhost"), port(0), quiet(false), fetchTimeout(5) { addOptions() ("init", qpid::optValue(init, "yes|no"), "Declare queues and populate one with the initial set of messages.") @@ -85,6 +86,7 @@ struct Options : public qpid::Options { ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") ("port,p", qpid::optValue(port, "PORT"), "(for test compatibility only, use broker option instead)") ("quiet", qpid::optValue(quiet), "reduce output from test") + ("fetch-timeout", qpid::optValue(fetchTimeout, "SECONDS"), "Timeout for transactional fetch") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -199,7 +201,7 @@ struct Transfer : public TransactionalClient, public Runnable id << source << ">" << target << ":" << t+1; try { for (uint m = 0; m < opts.msgsPerTx; m++) { - Message msg = receiver.fetch(Duration::SECOND*30); + Message msg = receiver.fetch(Duration::SECOND*opts.fetchTimeout); if (msg.getContentSize() != opts.size) { std::ostringstream oss; oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size; |