summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/FailoverSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/FailoverSession.cpp')
-rw-r--r--cpp/src/qpid/client/FailoverSession.cpp879
1 files changed, 861 insertions, 18 deletions
diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp
index 25867c2a24..d11d5aa362 100644
--- a/cpp/src/qpid/client/FailoverSession.cpp
+++ b/cpp/src/qpid/client/FailoverSession.cpp
@@ -38,7 +38,8 @@ namespace qpid {
namespace client {
FailoverSession::FailoverSession ( ) :
- failover_in_progress(false)
+ failover_in_progress(false),
+ failover_count(0)
{
// The session is created by FailoverConnection::newSession
failoverSubscriptionManager = 0;
@@ -50,32 +51,108 @@ FailoverSession::~FailoverSession ( )
}
+
framing::FrameSet::shared_ptr
FailoverSession::get()
{
+ while(1)
+ {
+ try
+ {
return session.get();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
SessionId
-FailoverSession::getId() const
+FailoverSession::getId()
{
+ while(1)
+ {
+ try
+ {
return session.getId();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
void
FailoverSession::close()
{
+ while(1)
+ {
+ try
+ {
session.close();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
void
FailoverSession::sync()
{
+ while(1)
+ {
+ try
+ {
+ session.sync();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
- session.sync();
}
@@ -90,16 +167,53 @@ FailoverSession::timeout(uint32_t /*seconds*/ )
Execution&
FailoverSession::getExecution()
{
-
+ while(1)
+ {
+ try
+ {
return session.getExecution();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
void
FailoverSession::flush()
{
-
+ while(1)
+ {
+ try
+ {
session.flush();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -109,8 +223,27 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id,
bool notifyPeer
)
{
-
+ while(1)
+ {
+ try
+ {
session.markCompleted ( id, cumulative, notifyPeer );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -120,8 +253,27 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id,
void
FailoverSession::executionSync()
{
-
+ while(1)
+ {
+ try
+ {
session.executionSync();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -131,10 +283,29 @@ FailoverSession::executionResult ( const SequenceNumber& commandId,
const string& value
)
{
-
+ while(1)
+ {
+ try
+ {
session.executionResult ( commandId,
value
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -149,7 +320,10 @@ FailoverSession::executionException ( uint16_t errorCode,
const FieldTable& errorInfo
)
{
-
+ while(1)
+ {
+ try
+ {
session.executionException ( errorCode,
commandId,
classCode,
@@ -158,6 +332,22 @@ FailoverSession::executionException ( uint16_t errorCode,
description,
errorInfo
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -179,7 +369,7 @@ FailoverSession::messageTransfer ( const string& destination,
acquireMode,
content
);
- break;
+ return;
}
catch ( ... )
{
@@ -197,8 +387,27 @@ FailoverSession::messageTransfer ( const string& destination,
void
FailoverSession::messageAccept ( const SequenceSet& transfers )
{
-
+ while(1)
+ {
+ try
+ {
session.messageAccept ( transfers );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -209,11 +418,30 @@ FailoverSession::messageReject ( const SequenceSet& transfers,
const string& text
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageReject ( transfers,
code,
text
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -223,10 +451,29 @@ FailoverSession::messageRelease ( const SequenceSet& transfers,
bool setRedelivered
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageRelease ( transfers,
setRedelivered
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -234,8 +481,26 @@ FailoverSession::messageRelease ( const SequenceSet& transfers,
qpid::framing::MessageAcquireResult
FailoverSession::messageAcquire ( const SequenceSet& transfers )
{
-
+ while(1)
+ {
+ try
+ {
return session.messageAcquire ( transfers );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -245,10 +510,28 @@ FailoverSession::messageResume ( const string& destination,
const string& resumeId
)
{
-
+ while(1)
+ {
+ try
+ {
return session.messageResume ( destination,
resumeId
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -264,7 +547,10 @@ FailoverSession::messageSubscribe ( const string& queue,
const FieldTable& arguments
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageSubscribe ( queue,
destination,
acceptMode,
@@ -274,6 +560,22 @@ FailoverSession::messageSubscribe ( const string& queue,
resumeTtl,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -281,8 +583,27 @@ FailoverSession::messageSubscribe ( const string& queue,
void
FailoverSession::messageCancel ( const string& destinations )
{
-
+ while(1)
+ {
+ try
+ {
session.messageCancel ( destinations );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -292,9 +613,28 @@ FailoverSession::messageSetFlowMode ( const string& destination,
uint8_t flowMode
)
{
+ while(1)
+ {
+ try
+ {
session.messageSetFlowMode ( destination,
flowMode
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -304,10 +644,29 @@ FailoverSession::messageFlow(const string& destination,
uint8_t unit,
uint32_t value)
{
+ while(1)
+ {
+ try
+ {
session.messageFlow ( destination,
unit,
value
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -315,7 +674,26 @@ FailoverSession::messageFlow(const string& destination,
void
FailoverSession::messageFlush(const string& destination)
{
+ while(1)
+ {
+ try
+ {
session.messageFlush ( destination );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -323,7 +701,26 @@ FailoverSession::messageFlush(const string& destination)
void
FailoverSession::messageStop(const string& destination)
{
+ while(1)
+ {
+ try
+ {
session.messageStop ( destination );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -331,7 +728,26 @@ FailoverSession::messageStop(const string& destination)
void
FailoverSession::txSelect()
{
+ while(1)
+ {
+ try
+ {
session.txSelect ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -339,7 +755,26 @@ FailoverSession::txSelect()
void
FailoverSession::txCommit()
{
+ while(1)
+ {
+ try
+ {
session.txCommit ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -347,7 +782,26 @@ FailoverSession::txCommit()
void
FailoverSession::txRollback()
{
+ while(1)
+ {
+ try
+ {
session.txRollback ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -355,7 +809,26 @@ FailoverSession::txRollback()
void
FailoverSession::dtxSelect()
{
+ while(1)
+ {
+ try
+ {
session.dtxSelect ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -365,10 +838,28 @@ FailoverSession::dtxStart(const Xid& xid,
bool join,
bool resume)
{
+ while(1)
+ {
+ try
+ {
return session.dtxStart ( xid,
join,
resume
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -378,10 +869,28 @@ FailoverSession::dtxEnd(const Xid& xid,
bool fail,
bool suspend)
{
+ while(1)
+ {
+ try
+ {
return session.dtxEnd ( xid,
fail,
suspend
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -390,9 +899,27 @@ qpid::framing::XaResult
FailoverSession::dtxCommit(const Xid& xid,
bool onePhase)
{
+ while(1)
+ {
+ try
+ {
return session.dtxCommit ( xid,
onePhase
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -400,7 +927,26 @@ FailoverSession::dtxCommit(const Xid& xid,
void
FailoverSession::dtxForget(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
session.dtxForget ( xid );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -408,7 +954,25 @@ FailoverSession::dtxForget(const Xid& xid)
qpid::framing::DtxGetTimeoutResult
FailoverSession::dtxGetTimeout(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxGetTimeout ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -416,7 +980,25 @@ FailoverSession::dtxGetTimeout(const Xid& xid)
qpid::framing::XaResult
FailoverSession::dtxPrepare(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxPrepare ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -424,7 +1006,25 @@ FailoverSession::dtxPrepare(const Xid& xid)
qpid::framing::DtxRecoverResult
FailoverSession::dtxRecover()
{
+ while(1)
+ {
+ try
+ {
return session.dtxRecover ( );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -432,7 +1032,25 @@ FailoverSession::dtxRecover()
qpid::framing::XaResult
FailoverSession::dtxRollback(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxRollback ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -441,9 +1059,28 @@ void
FailoverSession::dtxSetTimeout(const Xid& xid,
uint32_t timeout)
{
+ while(1)
+ {
+ try
+ {
session.dtxSetTimeout ( xid,
timeout
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -457,6 +1094,10 @@ FailoverSession::exchangeDeclare(const string& exchange,
bool autoDelete,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.exchangeDeclare ( exchange,
type,
alternateExchange,
@@ -465,6 +1106,21 @@ FailoverSession::exchangeDeclare(const string& exchange,
autoDelete,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -473,9 +1129,28 @@ void
FailoverSession::exchangeDelete(const string& exchange,
bool ifUnused)
{
+ while(1)
+ {
+ try
+ {
session.exchangeDelete ( exchange,
ifUnused
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -483,7 +1158,25 @@ FailoverSession::exchangeDelete(const string& exchange,
qpid::framing::ExchangeQueryResult
FailoverSession::exchangeQuery(const string& name)
{
+ while(1)
+ {
+ try
+ {
return session.exchangeQuery ( name );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -494,11 +1187,30 @@ FailoverSession::exchangeBind(const string& queue,
const string& bindingKey,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.exchangeBind ( queue,
exchange,
bindingKey,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -508,10 +1220,29 @@ FailoverSession::exchangeUnbind(const string& queue,
const string& exchange,
const string& bindingKey)
{
+ while(1)
+ {
+ try
+ {
session.exchangeUnbind ( queue,
exchange,
bindingKey
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -522,11 +1253,29 @@ FailoverSession::exchangeBound(const string& exchange,
const string& bindingKey,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
return session.exchangeBound ( exchange,
queue,
bindingKey,
arguments
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -540,6 +1289,10 @@ FailoverSession::queueDeclare(const string& queue,
bool autoDelete,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.queueDeclare ( queue,
alternateExchange,
passive,
@@ -548,6 +1301,21 @@ FailoverSession::queueDeclare(const string& queue,
autoDelete,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -557,10 +1325,29 @@ FailoverSession::queueDelete(const string& queue,
bool ifUnused,
bool ifEmpty)
{
+ while(1)
+ {
+ try
+ {
session.queueDelete ( queue,
ifUnused,
ifEmpty
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -568,7 +1355,26 @@ FailoverSession::queueDelete(const string& queue,
void
FailoverSession::queuePurge(const string& queue)
{
+ while(1)
+ {
+ try
+ {
session.queuePurge ( queue) ;
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -576,7 +1382,25 @@ FailoverSession::queuePurge(const string& queue)
qpid::framing::QueueQueryResult
FailoverSession::queueQuery(const string& queue)
{
- return session.queueQuery ( queue );
+ while(1)
+ {
+ try
+ {
+ return session.queueQuery ( queue );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -587,6 +1411,7 @@ FailoverSession::queueQuery(const string& queue)
void
FailoverSession::prepareForFailover ( Connection newConnection )
{
+ failover_in_progress = true;
try
{
newSession = newConnection.newSession();
@@ -603,6 +1428,24 @@ FailoverSession::prepareForFailover ( Connection newConnection )
}
+void
+FailoverSession::failoverStarting ( )
+{
+ sys::Monitor::ScopedLock l(lock);
+ failover_in_progress = true;
+}
+
+
+void
+FailoverSession::failoverComplete ( )
+{
+ sys::Monitor::ScopedLock l(lock);
+ failover_in_progress = false;
+ ++ failover_count;
+ lock.notifyAll();
+}
+
+
void
FailoverSession::failover ( )