summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-22 13:09:33 +0000
committerAlan Conway <aconway@apache.org>2008-10-22 13:09:33 +0000
commitff9a534510c37e8ec951bc0e0cb4528626b67b6e (patch)
tree1a00b5c6bbe76db1a07dc5c680b2ffa340f54985
parentf1709b592377abde0ddcc9eab07e85a3b3c928b5 (diff)
downloadqpid-python-ff9a534510c37e8ec951bc0e0cb4528626b67b6e.tar.gz
QPID-1382 from Mick Goulish: Improvement to Client-Side Cluster Failover code
Also: Fix missing DispatchHandle.h include in sys/PollableQueue.h Added ignore properties for failover example binaries & Makefile. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707065 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/failover/direct_producer.cpp6
-rw-r--r--qpid/cpp/examples/failover/listener.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/FailoverConnection.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSession.cpp879
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSession.h15
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp1
-rw-r--r--qpid/cpp/src/qpid/sys/PollableQueue.h1
8 files changed, 878 insertions, 32 deletions
diff --git a/qpid/cpp/examples/failover/direct_producer.cpp b/qpid/cpp/examples/failover/direct_producer.cpp
index 513971197e..2a0104a994 100644
--- a/qpid/cpp/examples/failover/direct_producer.cpp
+++ b/qpid/cpp/examples/failover/direct_producer.cpp
@@ -60,6 +60,8 @@ main ( int argc, char ** argv)
if ( count > 1000 )
report = !(sent % 1000);
+ report = false;
+
if ( report )
{
std::cout << "sending message "
@@ -71,7 +73,7 @@ main ( int argc, char ** argv)
message_data << sent;
message.setData(message_data.str());
- /* MICK FIXME
+ /* FIXME mgoulish 21 oct 08
session.messageTransfer ( arg::content=message,
arg::destination="amq.direct"
); */
@@ -85,7 +87,7 @@ main ( int argc, char ** argv)
}
message.setData ( "That's all, folks!" );
- /* FIXME mgoulish 16 Oct 08
+ /* FIXME mgoulish 21 oct 08
session.messageTransfer ( arg::content=message,
arg::destination="amq.direct"
);
diff --git a/qpid/cpp/examples/failover/listener.cpp b/qpid/cpp/examples/failover/listener.cpp
index d8cb78c9ce..82913a521a 100644
--- a/qpid/cpp/examples/failover/listener.cpp
+++ b/qpid/cpp/examples/failover/listener.cpp
@@ -64,8 +64,10 @@ Listener::Listener ( FailoverSubscriptionManager & s ) :
void
Listener::received ( Message & message )
{
+ /*
if(! (count%1000))
std::cerr << "\t\tListener received: " << message.getData() << std::endl;
+ * */
++ count;
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index 92cf756580..b95f03164e 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -151,8 +151,6 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) {
static const std::string CONN_CLOSED("Connection closed by broker");
void ConnectionImpl::shutdown() {
- Mutex::ScopedLock l(lock);
-
if ( failureCallback )
failureCallback();
diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.cpp b/qpid/cpp/src/qpid/client/FailoverConnection.cpp
index 33b06a6a1a..8b37ba5cfa 100644
--- a/qpid/cpp/src/qpid/client/FailoverConnection.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverConnection.cpp
@@ -129,7 +129,7 @@ FailoverConnection::failover ( )
++ sessions_iterator )
{
FailoverSession * fs = * sessions_iterator;
- fs->failover_in_progress = true;
+ fs->failoverStarting();
}
std::vector<Url> knownBrokers = connection.getKnownBrokers();
@@ -187,7 +187,7 @@ FailoverConnection::failover ( )
)
{
FailoverSession * fs = * sessions_iterator;
- fs->failover_in_progress = false;
+ fs->failoverComplete();
}
}
diff --git a/qpid/cpp/src/qpid/client/FailoverSession.cpp b/qpid/cpp/src/qpid/client/FailoverSession.cpp
index 25867c2a24..d11d5aa362 100644
--- a/qpid/cpp/src/qpid/client/FailoverSession.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSession.cpp
@@ -38,7 +38,8 @@ namespace qpid {
namespace client {
FailoverSession::FailoverSession ( ) :
- failover_in_progress(false)
+ failover_in_progress(false),
+ failover_count(0)
{
// The session is created by FailoverConnection::newSession
failoverSubscriptionManager = 0;
@@ -50,32 +51,108 @@ FailoverSession::~FailoverSession ( )
}
+
framing::FrameSet::shared_ptr
FailoverSession::get()
{
+ while(1)
+ {
+ try
+ {
return session.get();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
SessionId
-FailoverSession::getId() const
+FailoverSession::getId()
{
+ while(1)
+ {
+ try
+ {
return session.getId();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
void
FailoverSession::close()
{
+ while(1)
+ {
+ try
+ {
session.close();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
void
FailoverSession::sync()
{
+ while(1)
+ {
+ try
+ {
+ session.sync();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
- session.sync();
}
@@ -90,16 +167,53 @@ FailoverSession::timeout(uint32_t /*seconds*/ )
Execution&
FailoverSession::getExecution()
{
-
+ while(1)
+ {
+ try
+ {
return session.getExecution();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
void
FailoverSession::flush()
{
-
+ while(1)
+ {
+ try
+ {
session.flush();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -109,8 +223,27 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id,
bool notifyPeer
)
{
-
+ while(1)
+ {
+ try
+ {
session.markCompleted ( id, cumulative, notifyPeer );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -120,8 +253,27 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id,
void
FailoverSession::executionSync()
{
-
+ while(1)
+ {
+ try
+ {
session.executionSync();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -131,10 +283,29 @@ FailoverSession::executionResult ( const SequenceNumber& commandId,
const string& value
)
{
-
+ while(1)
+ {
+ try
+ {
session.executionResult ( commandId,
value
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -149,7 +320,10 @@ FailoverSession::executionException ( uint16_t errorCode,
const FieldTable& errorInfo
)
{
-
+ while(1)
+ {
+ try
+ {
session.executionException ( errorCode,
commandId,
classCode,
@@ -158,6 +332,22 @@ FailoverSession::executionException ( uint16_t errorCode,
description,
errorInfo
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -179,7 +369,7 @@ FailoverSession::messageTransfer ( const string& destination,
acquireMode,
content
);
- break;
+ return;
}
catch ( ... )
{
@@ -197,8 +387,27 @@ FailoverSession::messageTransfer ( const string& destination,
void
FailoverSession::messageAccept ( const SequenceSet& transfers )
{
-
+ while(1)
+ {
+ try
+ {
session.messageAccept ( transfers );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -209,11 +418,30 @@ FailoverSession::messageReject ( const SequenceSet& transfers,
const string& text
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageReject ( transfers,
code,
text
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -223,10 +451,29 @@ FailoverSession::messageRelease ( const SequenceSet& transfers,
bool setRedelivered
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageRelease ( transfers,
setRedelivered
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -234,8 +481,26 @@ FailoverSession::messageRelease ( const SequenceSet& transfers,
qpid::framing::MessageAcquireResult
FailoverSession::messageAcquire ( const SequenceSet& transfers )
{
-
+ while(1)
+ {
+ try
+ {
return session.messageAcquire ( transfers );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -245,10 +510,28 @@ FailoverSession::messageResume ( const string& destination,
const string& resumeId
)
{
-
+ while(1)
+ {
+ try
+ {
return session.messageResume ( destination,
resumeId
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -264,7 +547,10 @@ FailoverSession::messageSubscribe ( const string& queue,
const FieldTable& arguments
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageSubscribe ( queue,
destination,
acceptMode,
@@ -274,6 +560,22 @@ FailoverSession::messageSubscribe ( const string& queue,
resumeTtl,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -281,8 +583,27 @@ FailoverSession::messageSubscribe ( const string& queue,
void
FailoverSession::messageCancel ( const string& destinations )
{
-
+ while(1)
+ {
+ try
+ {
session.messageCancel ( destinations );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -292,9 +613,28 @@ FailoverSession::messageSetFlowMode ( const string& destination,
uint8_t flowMode
)
{
+ while(1)
+ {
+ try
+ {
session.messageSetFlowMode ( destination,
flowMode
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -304,10 +644,29 @@ FailoverSession::messageFlow(const string& destination,
uint8_t unit,
uint32_t value)
{
+ while(1)
+ {
+ try
+ {
session.messageFlow ( destination,
unit,
value
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -315,7 +674,26 @@ FailoverSession::messageFlow(const string& destination,
void
FailoverSession::messageFlush(const string& destination)
{
+ while(1)
+ {
+ try
+ {
session.messageFlush ( destination );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -323,7 +701,26 @@ FailoverSession::messageFlush(const string& destination)
void
FailoverSession::messageStop(const string& destination)
{
+ while(1)
+ {
+ try
+ {
session.messageStop ( destination );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -331,7 +728,26 @@ FailoverSession::messageStop(const string& destination)
void
FailoverSession::txSelect()
{
+ while(1)
+ {
+ try
+ {
session.txSelect ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -339,7 +755,26 @@ FailoverSession::txSelect()
void
FailoverSession::txCommit()
{
+ while(1)
+ {
+ try
+ {
session.txCommit ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -347,7 +782,26 @@ FailoverSession::txCommit()
void
FailoverSession::txRollback()
{
+ while(1)
+ {
+ try
+ {
session.txRollback ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -355,7 +809,26 @@ FailoverSession::txRollback()
void
FailoverSession::dtxSelect()
{
+ while(1)
+ {
+ try
+ {
session.dtxSelect ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -365,10 +838,28 @@ FailoverSession::dtxStart(const Xid& xid,
bool join,
bool resume)
{
+ while(1)
+ {
+ try
+ {
return session.dtxStart ( xid,
join,
resume
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -378,10 +869,28 @@ FailoverSession::dtxEnd(const Xid& xid,
bool fail,
bool suspend)
{
+ while(1)
+ {
+ try
+ {
return session.dtxEnd ( xid,
fail,
suspend
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -390,9 +899,27 @@ qpid::framing::XaResult
FailoverSession::dtxCommit(const Xid& xid,
bool onePhase)
{
+ while(1)
+ {
+ try
+ {
return session.dtxCommit ( xid,
onePhase
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -400,7 +927,26 @@ FailoverSession::dtxCommit(const Xid& xid,
void
FailoverSession::dtxForget(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
session.dtxForget ( xid );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -408,7 +954,25 @@ FailoverSession::dtxForget(const Xid& xid)
qpid::framing::DtxGetTimeoutResult
FailoverSession::dtxGetTimeout(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxGetTimeout ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -416,7 +980,25 @@ FailoverSession::dtxGetTimeout(const Xid& xid)
qpid::framing::XaResult
FailoverSession::dtxPrepare(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxPrepare ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -424,7 +1006,25 @@ FailoverSession::dtxPrepare(const Xid& xid)
qpid::framing::DtxRecoverResult
FailoverSession::dtxRecover()
{
+ while(1)
+ {
+ try
+ {
return session.dtxRecover ( );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -432,7 +1032,25 @@ FailoverSession::dtxRecover()
qpid::framing::XaResult
FailoverSession::dtxRollback(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxRollback ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -441,9 +1059,28 @@ void
FailoverSession::dtxSetTimeout(const Xid& xid,
uint32_t timeout)
{
+ while(1)
+ {
+ try
+ {
session.dtxSetTimeout ( xid,
timeout
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -457,6 +1094,10 @@ FailoverSession::exchangeDeclare(const string& exchange,
bool autoDelete,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.exchangeDeclare ( exchange,
type,
alternateExchange,
@@ -465,6 +1106,21 @@ FailoverSession::exchangeDeclare(const string& exchange,
autoDelete,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -473,9 +1129,28 @@ void
FailoverSession::exchangeDelete(const string& exchange,
bool ifUnused)
{
+ while(1)
+ {
+ try
+ {
session.exchangeDelete ( exchange,
ifUnused
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -483,7 +1158,25 @@ FailoverSession::exchangeDelete(const string& exchange,
qpid::framing::ExchangeQueryResult
FailoverSession::exchangeQuery(const string& name)
{
+ while(1)
+ {
+ try
+ {
return session.exchangeQuery ( name );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -494,11 +1187,30 @@ FailoverSession::exchangeBind(const string& queue,
const string& bindingKey,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.exchangeBind ( queue,
exchange,
bindingKey,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -508,10 +1220,29 @@ FailoverSession::exchangeUnbind(const string& queue,
const string& exchange,
const string& bindingKey)
{
+ while(1)
+ {
+ try
+ {
session.exchangeUnbind ( queue,
exchange,
bindingKey
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -522,11 +1253,29 @@ FailoverSession::exchangeBound(const string& exchange,
const string& bindingKey,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
return session.exchangeBound ( exchange,
queue,
bindingKey,
arguments
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -540,6 +1289,10 @@ FailoverSession::queueDeclare(const string& queue,
bool autoDelete,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.queueDeclare ( queue,
alternateExchange,
passive,
@@ -548,6 +1301,21 @@ FailoverSession::queueDeclare(const string& queue,
autoDelete,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -557,10 +1325,29 @@ FailoverSession::queueDelete(const string& queue,
bool ifUnused,
bool ifEmpty)
{
+ while(1)
+ {
+ try
+ {
session.queueDelete ( queue,
ifUnused,
ifEmpty
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -568,7 +1355,26 @@ FailoverSession::queueDelete(const string& queue,
void
FailoverSession::queuePurge(const string& queue)
{
+ while(1)
+ {
+ try
+ {
session.queuePurge ( queue) ;
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -576,7 +1382,25 @@ FailoverSession::queuePurge(const string& queue)
qpid::framing::QueueQueryResult
FailoverSession::queueQuery(const string& queue)
{
- return session.queueQuery ( queue );
+ while(1)
+ {
+ try
+ {
+ return session.queueQuery ( queue );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -587,6 +1411,7 @@ FailoverSession::queueQuery(const string& queue)
void
FailoverSession::prepareForFailover ( Connection newConnection )
{
+ failover_in_progress = true;
try
{
newSession = newConnection.newSession();
@@ -603,6 +1428,24 @@ FailoverSession::prepareForFailover ( Connection newConnection )
}
+void
+FailoverSession::failoverStarting ( )
+{
+ sys::Monitor::ScopedLock l(lock);
+ failover_in_progress = true;
+}
+
+
+void
+FailoverSession::failoverComplete ( )
+{
+ sys::Monitor::ScopedLock l(lock);
+ failover_in_progress = false;
+ ++ failover_count;
+ lock.notifyAll();
+}
+
+
void
FailoverSession::failover ( )
diff --git a/qpid/cpp/src/qpid/client/FailoverSession.h b/qpid/cpp/src/qpid/client/FailoverSession.h
index b301353968..7a743da452 100644
--- a/qpid/cpp/src/qpid/client/FailoverSession.h
+++ b/qpid/cpp/src/qpid/client/FailoverSession.h
@@ -35,7 +35,7 @@
#include "qpid/client/SessionImpl.h"
#include "qpid/client/TypedResult.h"
#include "qpid/shared_ptr.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
#include <string>
@@ -61,7 +61,7 @@ class FailoverSession
framing::FrameSet::shared_ptr get();
- SessionId getId() const;
+ SessionId getId();
void close();
@@ -80,8 +80,6 @@ class FailoverSession
void sendCompletion ( );
- bool failover_in_progress;
-
// Wrapped functions from Session ----------------------------
@@ -293,15 +291,18 @@ class FailoverSession
// end Wrapped functions from Session ---------------------------
// Tells the FailoverSession to get ready for a failover.
+ void failoverStarting();
void prepareForFailover ( Connection newConnection );
-
void failover ( );
+ void failoverComplete();
void setFailoverSubscriptionManager(FailoverSubscriptionManager*);
private:
- typedef sys::Mutex::ScopedLock Lock;
- sys::Mutex lock;
+ sys::Monitor lock;
+ bool failover_in_progress;
+ int failover_count;
+
FailoverSubscriptionManager * failoverSubscriptionManager;
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index d12d976ef5..0331cbeb9e 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -48,7 +48,6 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
sys::Monitor::ScopedLock l(lock);
newSession = _newSession;
newSessionIsValid = true;
- // lock.notifyAll();
}
diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h
index 8313196623..a594dab86d 100644
--- a/qpid/cpp/src/qpid/sys/PollableQueue.h
+++ b/qpid/cpp/src/qpid/sys/PollableQueue.h
@@ -24,6 +24,7 @@
#include "qpid/sys/PollableCondition.h"
#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Monitor.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>