summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-29 19:29:06 +0000
committerAlan Conway <aconway@apache.org>2014-08-29 19:29:06 +0000
commiteb6b6828a1e151c6da0a0e43b0ce7118b162ae1f (patch)
tree881df7cb1d1e7faa9b230ea27bb142e0480325f7
parent3fd257aaa7a445db97153acc0feda34148366d23 (diff)
downloadqpid-python-eb6b6828a1e151c6da0a0e43b0ce7118b162ae1f.tar.gz
QPID-5855: Fix to JAVA Client Can not recieve message with qpid ha cluster.
The original fix for this introduced a regression, running the qpid-txttest2 test against a cluster with the linear store failed. This fixes the fix. - Run transaction commit logic when the commit completes. Report completion to the user only when all prior commands have completed (sync point) - Fix missing initializer in client/amqp0_10/SessionImpl.cpp for transaction committing flag. - Remove annoying log messages from IdSetter.h - Skip transactional messages in prepare, don't wait till commit. - Added fetch-timeout option to qpid-txtest2 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1621368 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/AsyncCommandCallback.cpp5
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp13
-rw-r--r--cpp/src/qpid/ha/IdSetter.h2
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp2
-rw-r--r--cpp/src/tests/qpid-txtest2.cpp6
5 files changed, 18 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/AsyncCommandCallback.cpp b/cpp/src/qpid/broker/AsyncCommandCallback.cpp
index 52dab42948..0b52c6b5c5 100644
--- a/cpp/src/qpid/broker/AsyncCommandCallback.cpp
+++ b/cpp/src/qpid/broker/AsyncCommandCallback.cpp
@@ -58,9 +58,10 @@ void AsyncCommandCallback::complete() {
void AsyncCommandCallback::doCommand() {
SessionState* session = completerContext->getSession();
if (session && session->isAttached()) {
- // Complete now unless this is a syncPoint and there are incomplete commands.
+ std::string result = command(); // Execute the command now.
+ // Send completion now unless this is a syncPoint and there are incomplete commands.
if (!(syncPoint && session->addPendingExecutionSync(id)))
- session->completeCommand(id, false, requiresSync, command());
+ session->completeCommand(id, false, requiresSync, result);
}
else
throw InternalErrorException("Cannot complete command, no session");
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 9299ed7cb1..32f52adf43 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -59,7 +59,7 @@ typedef qpid::sys::Mutex::ScopedLock ScopedLock;
typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
SessionImpl::SessionImpl(ConnectionImpl& c, bool t) :
- connection(&c), transactional(t) {}
+ connection(&c), transactional(t), committing(false) {}
bool SessionImpl::isTransactional() const
{
@@ -101,11 +101,19 @@ void SessionImpl::sync(bool block)
else execute<NonBlockingSync>();
}
+namespace {
+struct ScopedSet {
+ bool& flag;
+ ScopedSet(bool& f) : flag(f) { flag = true; }
+ ~ScopedSet() { flag = false; }
+};
+}
+
void SessionImpl::commit()
{
try {
checkError();
- committing = true;
+ ScopedSet s(committing);
execute<Commit>();
}
catch (const TransactionError&) {
@@ -114,7 +122,6 @@ void SessionImpl::commit()
catch (const std::exception& e) {
txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what());
}
- committing = false;
checkError();
}
diff --git a/cpp/src/qpid/ha/IdSetter.h b/cpp/src/qpid/ha/IdSetter.h
index f0629c99bb..94bc74668e 100644
--- a/cpp/src/qpid/ha/IdSetter.h
+++ b/cpp/src/qpid/ha/IdSetter.h
@@ -53,7 +53,6 @@ class IdSetter : public broker::MessageInterceptor
// been enqueued or saved in a transaction buffer. This is when we normally want
// to assign a replication-id.
m.setReplicationId(nextId++);
- QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
}
void publish(broker::Message& m) {
@@ -63,7 +62,6 @@ class IdSetter : public broker::MessageInterceptor
// store record() is not called, so set the ID now if not already set.
if (!m.hasReplicationId()) {
m.setReplicationId(nextId++);
- QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m));
}
}
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 3c6318fbfe..56815ef89d 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -207,6 +207,7 @@ bool PrimaryTxObserver::prepare() {
Mutex::ScopedLock l(lock);
checkState(SENDING, "Too late for prepare");
state = PREPARING;
+ skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue.
txQueue->deliver(TxPrepareEvent().message());
return true;
}
@@ -216,7 +217,6 @@ void PrimaryTxObserver::commit() {
Mutex::ScopedLock l(lock);
checkState(PREPARING, "Cannot commit, not preparing");
if (incomplete.size() == 0) {
- skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue.
txQueue->deliver(TxCommitEvent().message());
end(l);
} else {
diff --git a/cpp/src/tests/qpid-txtest2.cpp b/cpp/src/tests/qpid-txtest2.cpp
index 6e2f81726e..5e1dd492da 100644
--- a/cpp/src/tests/qpid-txtest2.cpp
+++ b/cpp/src/tests/qpid-txtest2.cpp
@@ -63,11 +63,12 @@ struct Options : public qpid::Options {
qpid::log::Options log;
uint port;
bool quiet;
+ double fetchTimeout;
Options() : help(false), init(true), transfer(true), check(true),
size(256), durable(true), queues(2),
base("tx"), msgsPerTx(1), txCount(5), totalMsgCount(10),
- capacity(1000), url("localhost"), port(0), quiet(false)
+ capacity(1000), url("localhost"), port(0), quiet(false), fetchTimeout(5)
{
addOptions()
("init", qpid::optValue(init, "yes|no"), "Declare queues and populate one with the initial set of messages.")
@@ -85,6 +86,7 @@ struct Options : public qpid::Options {
("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
("port,p", qpid::optValue(port, "PORT"), "(for test compatibility only, use broker option instead)")
("quiet", qpid::optValue(quiet), "reduce output from test")
+ ("fetch-timeout", qpid::optValue(fetchTimeout, "SECONDS"), "Timeout for transactional fetch")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -199,7 +201,7 @@ struct Transfer : public TransactionalClient, public Runnable
id << source << ">" << target << ":" << t+1;
try {
for (uint m = 0; m < opts.msgsPerTx; m++) {
- Message msg = receiver.fetch(Duration::SECOND*30);
+ Message msg = receiver.fetch(Duration::SECOND*opts.fetchTimeout);
if (msg.getContentSize() != opts.size) {
std::ostringstream oss;
oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;