summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-10 21:19:46 +0000
committerAlan Conway <aconway@apache.org>2008-10-10 21:19:46 +0000
commitce1de9ac11d243051711720cf12b9275c94efb45 (patch)
treeb39e940a65d5bfc59f70aa6e136e1c1abcb34674 /qpid
parent5605be8d83cb6072780525f2183e637135a9004a (diff)
downloadqpid-python-ce1de9ac11d243051711720cf12b9275c94efb45.tar.gz
Failover client and example fixes & tidy up.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703575 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/examples/failover/direct_producer.cpp136
-rw-r--r--qpid/cpp/examples/failover/listener.cpp37
-rw-r--r--qpid/cpp/src/qpid/client/FailoverConnection.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/FailoverConnection.h8
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSession.cpp321
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSession.h8
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp222
-rw-r--r--qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h4
8 files changed, 360 insertions, 380 deletions
diff --git a/qpid/cpp/examples/failover/direct_producer.cpp b/qpid/cpp/examples/failover/direct_producer.cpp
index d8e74cdc41..4f91c9c4d4 100644
--- a/qpid/cpp/examples/failover/direct_producer.cpp
+++ b/qpid/cpp/examples/failover/direct_producer.cpp
@@ -1,14 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
#include <qpid/client/FailoverConnection.h>
#include <qpid/client/Session.h>
#include <qpid/client/AsyncSession.h>
#include <qpid/client/Message.h>
-#include <unistd.h>
-#include <cstdlib>
#include <iostream>
-#include <fstream>
-
#include <sstream>
using namespace qpid::client;
@@ -16,102 +33,46 @@ using namespace qpid::framing;
using namespace std;
-
-
-
int
main ( int argc, char ** argv)
{
- try {
- struct timeval broker_killed_time = {0,0};
- struct timeval failover_complete_time = {0,0};
- struct timeval duration = {0,0};
-
-
- if ( argc < 3 )
- {
- std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
- std::cerr << "i.e. for host: 127.0.0.1\n";
- exit(1);
- }
-
- char const * host = argv[1];
- int port = atoi(argv[2]);
- char const * broker_to_kill = 0;
-
- if ( argc > 3 )
- {
- broker_to_kill = argv[3];
- std::cerr << "main: Broker marked for death is process ID "
- << broker_to_kill
- << endl;
- }
- else
- {
- std::cerr << "PRODUCER main: there is no broker to kill.\n";
- }
+ const char* host = argc>1 ? argv[1] : "127.0.0.1";
+ int port = argc>2 ? atoi(argv[2]) : 5672;
+ int count = argc>3 ? atoi(argv[3]) : 30;
+ try {
FailoverConnection connection;
FailoverSession * session;
Message message;
string program_name = "PRODUCER";
-
- connection.failoverCompleteTime = & failover_complete_time;
- connection.name = program_name;
connection.open ( host, port );
-
session = connection.newSession();
- session->name = program_name;
-
- int send_this_many = 30,
- messages_sent = 0;
-
- while ( messages_sent < send_this_many )
- {
- if ( (messages_sent == 13) && broker_to_kill )
- {
- char command[1000];
- std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
- sprintf(command, "kill -9 %s", broker_to_kill);
- system ( command );
- gettimeofday ( & broker_killed_time, 0 );
- }
-
+ int sent = 0;
+ while ( sent < count ) {
message.getDeliveryProperties().setRoutingKey("routing_key");
-
- std::cerr << "sending message "
- << messages_sent
+ std::cout << "sending message "
+ << sent
<< " of "
- << send_this_many
+ << count
<< ".\n";
-
stringstream message_data;
- message_data << messages_sent;
+ message_data << sent;
message.setData(message_data.str());
- try
- {
- /* MICK FIXME
- session.messageTransfer ( arg::content=message,
- arg::destination="amq.direct"
- ); */
- session->messageTransfer ( "amq.direct",
- 1,
- 0,
- message
- );
- }
- catch ( const std::exception& error)
- {
- cerr << program_name << " exception: " << error.what() << endl;
- }
-
+ /* MICK FIXME
+ session.messageTransfer ( arg::content=message,
+ arg::destination="amq.direct"
+ ); */
+ session->messageTransfer ( "amq.direct",
+ 1,
+ 0,
+ message
+ );
sleep ( 1 );
- ++ messages_sent;
+ ++ sent;
}
-
message.setData ( "That's all, folks!" );
/* MICK FIXME
@@ -127,22 +88,7 @@ main ( int argc, char ** argv)
session->sync();
connection.close();
-
- // This will be incorrect if you killed more than one...
- if ( broker_to_kill )
- {
- timersub ( & failover_complete_time,
- & broker_killed_time,
- & duration
- );
- fprintf ( stderr,
- "Failover time: %ld.%.6ld\n",
- duration.tv_sec,
- duration.tv_usec
- );
- }
return 0;
-
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
diff --git a/qpid/cpp/examples/failover/listener.cpp b/qpid/cpp/examples/failover/listener.cpp
index d5ade9b9e2..c4c7d096b3 100644
--- a/qpid/cpp/examples/failover/listener.cpp
+++ b/qpid/cpp/examples/failover/listener.cpp
@@ -1,11 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
#include <qpid/client/FailoverConnection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/SubscriptionManager.h>
-#include <unistd.h>
-#include <cstdlib>
#include <iostream>
#include <fstream>
@@ -16,8 +34,6 @@ using namespace qpid::framing;
using namespace std;
-
-
struct Recorder
{
unsigned int max_messages;
@@ -211,19 +227,12 @@ Listener::parse_message ( const std::string & msg )
int
main ( int argc, char ** argv )
{
+ const char* host = argc>1 ? argv[1] : "127.0.0.1";
+ int port = argc>2 ? atoi(argv[2]) : 5672;
+
try {
string program_name = "LISTENER";
- if ( argc < 3 )
- {
- std::cerr << "Usage: ./listener host cluster_port_file_name\n";
- std::cerr << "i.e. for host: 127.0.0.1\n";
- exit(1);
- }
-
- char const * host = argv[1];
- int port = atoi(argv[2]);
-
FailoverConnection connection;
FailoverSession * session;
Recorder recorder;
diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.cpp b/qpid/cpp/src/qpid/client/FailoverConnection.cpp
index cac680295d..c5d775bf82 100644
--- a/qpid/cpp/src/qpid/client/FailoverConnection.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverConnection.cpp
@@ -146,9 +146,9 @@ FailoverConnection::failover ( )
* (and, through them, their SessionManagers and whatever else)
* that we are about to failover to this new Connection.
*/
- // FIXME mgoulish -- get rid of two-passes here.
- std::vector<FailoverSession *>::iterator sessions_iterator;
+ // FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession
+ std::vector<FailoverSession *>::iterator sessions_iterator;
for ( sessions_iterator = sessions.begin();
sessions_iterator < sessions.end();
++ sessions_iterator
diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.h b/qpid/cpp/src/qpid/client/FailoverConnection.h
index 1cec0bfd5b..09e9c8bfa4 100644
--- a/qpid/cpp/src/qpid/client/FailoverConnection.h
+++ b/qpid/cpp/src/qpid/client/FailoverConnection.h
@@ -29,7 +29,7 @@
#include "qpid/client/FailoverConnection.h"
#include "qpid/client/FailoverSession.h"
#include "qpid/client/FailoverSubscriptionManager.h"
-
+#include "qpid/sys/Mutex.h"
namespace qpid {
@@ -82,11 +82,11 @@ class FailoverConnection
private:
- std::string host;
+ typedef sys::Mutex::ScopedLock Lock;
- Connection connection;
+ sys::Mutex lock;
- int currentPortNumber;
+ Connection connection;
boost::function<void ()> clientFailoverCallback;
diff --git a/qpid/cpp/src/qpid/client/FailoverSession.cpp b/qpid/cpp/src/qpid/client/FailoverSession.cpp
index 1e20edde4a..c6fb573bce 100644
--- a/qpid/cpp/src/qpid/client/FailoverSession.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSession.cpp
@@ -38,10 +38,10 @@ namespace qpid {
namespace client {
FailoverSession::FailoverSession ( ) :
- name("no_name")
+ name("no_name")
{
- // The session is created by FailoverConnection::newSession
- failoverSubscriptionManager = 0;
+ // The session is created by FailoverConnection::newSession
+ failoverSubscriptionManager = 0;
}
@@ -53,50 +53,54 @@ FailoverSession::~FailoverSession ( )
framing::FrameSet::shared_ptr
FailoverSession::get()
{
- return session.get();
+ return session.get();
}
SessionId
FailoverSession::getId() const
{
- return session.getId();
+ return session.getId();
}
void
FailoverSession::close()
{
- session.close();
+ session.close();
}
void
FailoverSession::sync()
{
- session.sync();
+
+ session.sync();
}
uint32_t
FailoverSession::timeout(uint32_t /*seconds*/ )
{
- // MICK WTF? return session.timeout ( seconds );
- return 0;
+
+ // MICK WTF? return session.timeout ( seconds );
+ return 0;
}
Execution&
FailoverSession::getExecution()
{
- return session.getExecution();
+
+ return session.getExecution();
}
void
FailoverSession::flush()
{
- session.flush();
+
+ session.flush();
}
@@ -104,9 +108,10 @@ void
FailoverSession::markCompleted(const framing::SequenceNumber& id,
bool cumulative,
bool notifyPeer
- )
+)
{
- session.markCompleted ( id, cumulative, notifyPeer );
+
+ session.markCompleted ( id, cumulative, notifyPeer );
}
@@ -116,7 +121,8 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id,
void
FailoverSession::executionSync()
{
- session.executionSync();
+
+ session.executionSync();
}
@@ -124,11 +130,12 @@ FailoverSession::executionSync()
void
FailoverSession::executionResult ( const SequenceNumber& commandId,
const string& value
- )
+)
{
- session.executionResult ( commandId,
- value
- );
+
+ session.executionResult ( commandId,
+ value
+ );
}
@@ -141,16 +148,17 @@ FailoverSession::executionException ( uint16_t errorCode,
uint8_t fieldIndex,
const string& description,
const FieldTable& errorInfo
- )
+)
{
- session.executionException ( errorCode,
- commandId,
- classCode,
- commandCode,
- fieldIndex,
- description,
- errorInfo
- );
+
+ session.executionException ( errorCode,
+ commandId,
+ classCode,
+ commandCode,
+ fieldIndex,
+ description,
+ errorInfo
+ );
}
@@ -160,13 +168,14 @@ FailoverSession::messageTransfer ( const string& destination,
uint8_t acceptMode,
uint8_t acquireMode,
const MethodContent& content
- )
+)
{
- session.messageTransfer ( destination,
- acceptMode,
- acquireMode,
- content
- );
+
+ session.messageTransfer ( destination,
+ acceptMode,
+ acquireMode,
+ content
+ );
}
@@ -174,7 +183,8 @@ FailoverSession::messageTransfer ( const string& destination,
void
FailoverSession::messageAccept ( const SequenceSet& transfers )
{
- session.messageAccept ( transfers );
+
+ session.messageAccept ( transfers );
}
@@ -183,12 +193,13 @@ void
FailoverSession::messageReject ( const SequenceSet& transfers,
uint16_t code,
const string& text
- )
+)
{
- session.messageReject ( transfers,
- code,
- text
- );
+
+ session.messageReject ( transfers,
+ code,
+ text
+ );
}
@@ -196,11 +207,12 @@ FailoverSession::messageReject ( const SequenceSet& transfers,
void
FailoverSession::messageRelease ( const SequenceSet& transfers,
bool setRedelivered
- )
+)
{
- session.messageRelease ( transfers,
- setRedelivered
- );
+
+ session.messageRelease ( transfers,
+ setRedelivered
+ );
}
@@ -208,7 +220,8 @@ FailoverSession::messageRelease ( const SequenceSet& transfers,
qpid::framing::MessageAcquireResult
FailoverSession::messageAcquire ( const SequenceSet& transfers )
{
- return session.messageAcquire ( transfers );
+
+ return session.messageAcquire ( transfers );
}
@@ -216,11 +229,12 @@ FailoverSession::messageAcquire ( const SequenceSet& transfers )
qpid::framing::MessageResumeResult
FailoverSession::messageResume ( const string& destination,
const string& resumeId
- )
+)
{
- return session.messageResume ( destination,
- resumeId
- );
+
+ return session.messageResume ( destination,
+ resumeId
+ );
}
@@ -234,17 +248,18 @@ FailoverSession::messageSubscribe ( const string& queue,
const string& resumeId,
uint64_t resumeTtl,
const FieldTable& arguments
- )
+)
{
- session.messageSubscribe ( queue,
- destination,
- acceptMode,
- acquireMode,
- exclusive,
- resumeId,
- resumeTtl,
- arguments
- );
+
+ session.messageSubscribe ( queue,
+ destination,
+ acceptMode,
+ acquireMode,
+ exclusive,
+ resumeId,
+ resumeTtl,
+ arguments
+ );
}
@@ -252,7 +267,8 @@ FailoverSession::messageSubscribe ( const string& queue,
void
FailoverSession::messageCancel ( const string& destinations )
{
- session.messageCancel ( destinations );
+
+ session.messageCancel ( destinations );
}
@@ -260,11 +276,11 @@ FailoverSession::messageCancel ( const string& destinations )
void
FailoverSession::messageSetFlowMode ( const string& destination,
uint8_t flowMode
- )
+)
{
- session.messageSetFlowMode ( destination,
- flowMode
- );
+ session.messageSetFlowMode ( destination,
+ flowMode
+ );
}
@@ -274,10 +290,10 @@ FailoverSession::messageFlow(const string& destination,
uint8_t unit,
uint32_t value)
{
- session.messageFlow ( destination,
- unit,
- value
- );
+ session.messageFlow ( destination,
+ unit,
+ value
+ );
}
@@ -285,7 +301,7 @@ FailoverSession::messageFlow(const string& destination,
void
FailoverSession::messageFlush(const string& destination)
{
- session.messageFlush ( destination );
+ session.messageFlush ( destination );
}
@@ -293,7 +309,7 @@ FailoverSession::messageFlush(const string& destination)
void
FailoverSession::messageStop(const string& destination)
{
- session.messageStop ( destination );
+ session.messageStop ( destination );
}
@@ -301,7 +317,7 @@ FailoverSession::messageStop(const string& destination)
void
FailoverSession::txSelect()
{
- session.txSelect ( );
+ session.txSelect ( );
}
@@ -309,7 +325,7 @@ FailoverSession::txSelect()
void
FailoverSession::txCommit()
{
- session.txCommit ( );
+ session.txCommit ( );
}
@@ -317,7 +333,7 @@ FailoverSession::txCommit()
void
FailoverSession::txRollback()
{
- session.txRollback ( );
+ session.txRollback ( );
}
@@ -325,7 +341,7 @@ FailoverSession::txRollback()
void
FailoverSession::dtxSelect()
{
- session.dtxSelect ( );
+ session.dtxSelect ( );
}
@@ -335,10 +351,10 @@ FailoverSession::dtxStart(const Xid& xid,
bool join,
bool resume)
{
- return session.dtxStart ( xid,
- join,
- resume
- );
+ return session.dtxStart ( xid,
+ join,
+ resume
+ );
}
@@ -348,10 +364,10 @@ FailoverSession::dtxEnd(const Xid& xid,
bool fail,
bool suspend)
{
- return session.dtxEnd ( xid,
- fail,
- suspend
- );
+ return session.dtxEnd ( xid,
+ fail,
+ suspend
+ );
}
@@ -360,9 +376,9 @@ qpid::framing::XaResult
FailoverSession::dtxCommit(const Xid& xid,
bool onePhase)
{
- return session.dtxCommit ( xid,
- onePhase
- );
+ return session.dtxCommit ( xid,
+ onePhase
+ );
}
@@ -370,7 +386,7 @@ FailoverSession::dtxCommit(const Xid& xid,
void
FailoverSession::dtxForget(const Xid& xid)
{
- session.dtxForget ( xid );
+ session.dtxForget ( xid );
}
@@ -378,7 +394,7 @@ FailoverSession::dtxForget(const Xid& xid)
qpid::framing::DtxGetTimeoutResult
FailoverSession::dtxGetTimeout(const Xid& xid)
{
- return session.dtxGetTimeout ( xid );
+ return session.dtxGetTimeout ( xid );
}
@@ -386,7 +402,7 @@ FailoverSession::dtxGetTimeout(const Xid& xid)
qpid::framing::XaResult
FailoverSession::dtxPrepare(const Xid& xid)
{
- return session.dtxPrepare ( xid );
+ return session.dtxPrepare ( xid );
}
@@ -394,7 +410,7 @@ FailoverSession::dtxPrepare(const Xid& xid)
qpid::framing::DtxRecoverResult
FailoverSession::dtxRecover()
{
- return session.dtxRecover ( );
+ return session.dtxRecover ( );
}
@@ -402,7 +418,7 @@ FailoverSession::dtxRecover()
qpid::framing::XaResult
FailoverSession::dtxRollback(const Xid& xid)
{
- return session.dtxRollback ( xid );
+ return session.dtxRollback ( xid );
}
@@ -411,9 +427,9 @@ void
FailoverSession::dtxSetTimeout(const Xid& xid,
uint32_t timeout)
{
- session.dtxSetTimeout ( xid,
- timeout
- );
+ session.dtxSetTimeout ( xid,
+ timeout
+ );
}
@@ -427,14 +443,14 @@ FailoverSession::exchangeDeclare(const string& exchange,
bool autoDelete,
const FieldTable& arguments)
{
- session.exchangeDeclare ( exchange,
- type,
- alternateExchange,
- passive,
- durable,
- autoDelete,
- arguments
- );
+ session.exchangeDeclare ( exchange,
+ type,
+ alternateExchange,
+ passive,
+ durable,
+ autoDelete,
+ arguments
+ );
}
@@ -443,9 +459,9 @@ void
FailoverSession::exchangeDelete(const string& exchange,
bool ifUnused)
{
- session.exchangeDelete ( exchange,
- ifUnused
- );
+ session.exchangeDelete ( exchange,
+ ifUnused
+ );
}
@@ -453,7 +469,7 @@ FailoverSession::exchangeDelete(const string& exchange,
qpid::framing::ExchangeQueryResult
FailoverSession::exchangeQuery(const string& name)
{
- return session.exchangeQuery ( name );
+ return session.exchangeQuery ( name );
}
@@ -464,11 +480,11 @@ FailoverSession::exchangeBind(const string& queue,
const string& bindingKey,
const FieldTable& arguments)
{
- session.exchangeBind ( queue,
- exchange,
- bindingKey,
- arguments
- );
+ session.exchangeBind ( queue,
+ exchange,
+ bindingKey,
+ arguments
+ );
}
@@ -478,10 +494,10 @@ FailoverSession::exchangeUnbind(const string& queue,
const string& exchange,
const string& bindingKey)
{
- session.exchangeUnbind ( queue,
- exchange,
- bindingKey
- );
+ session.exchangeUnbind ( queue,
+ exchange,
+ bindingKey
+ );
}
@@ -492,11 +508,11 @@ FailoverSession::exchangeBound(const string& exchange,
const string& bindingKey,
const FieldTable& arguments)
{
- return session.exchangeBound ( exchange,
- queue,
- bindingKey,
- arguments
- );
+ return session.exchangeBound ( exchange,
+ queue,
+ bindingKey,
+ arguments
+ );
}
@@ -510,14 +526,14 @@ FailoverSession::queueDeclare(const string& queue,
bool autoDelete,
const FieldTable& arguments)
{
- session.queueDeclare ( queue,
- alternateExchange,
- passive,
- durable,
- exclusive,
- autoDelete,
- arguments
- );
+ session.queueDeclare ( queue,
+ alternateExchange,
+ passive,
+ durable,
+ exclusive,
+ autoDelete,
+ arguments
+ );
}
@@ -527,10 +543,10 @@ FailoverSession::queueDelete(const string& queue,
bool ifUnused,
bool ifEmpty)
{
- session.queueDelete ( queue,
- ifUnused,
- ifEmpty
- );
+ session.queueDelete ( queue,
+ ifUnused,
+ ifEmpty
+ );
}
@@ -538,7 +554,7 @@ FailoverSession::queueDelete(const string& queue,
void
FailoverSession::queuePurge(const string& queue)
{
- session.queuePurge ( queue) ;
+ session.queuePurge ( queue) ;
}
@@ -546,7 +562,7 @@ FailoverSession::queuePurge(const string& queue)
qpid::framing::QueueQueryResult
FailoverSession::queueQuery(const string& queue)
{
- return session.queueQuery ( queue );
+ return session.queueQuery ( queue );
}
@@ -557,20 +573,20 @@ FailoverSession::queueQuery(const string& queue)
void
FailoverSession::prepareForFailover ( Connection newConnection )
{
- try
- {
- newSession = newConnection.newSession();
- }
- catch ( const std::exception& error )
- {
- throw Exception(QPID_MSG("Can't create failover session."));
- }
-
+ try
+ {
+ newSession = newConnection.newSession();
+ }
+ catch ( const std::exception& error )
+ {
+ throw Exception(QPID_MSG("Can't create failover session."));
+ }
- if ( failoverSubscriptionManager )
- {
- failoverSubscriptionManager->prepareForFailover ( newSession );
- }
+ if ( failoverSubscriptionManager )
+ {
+ //
+ failoverSubscriptionManager->prepareForFailover ( newSession );
+ }
}
@@ -578,15 +594,16 @@ FailoverSession::prepareForFailover ( Connection newConnection )
void
FailoverSession::failover ( )
{
- if ( failoverSubscriptionManager )
- {
- failoverSubscriptionManager->failover ( );
- }
-
- session = newSession;
+ if ( failoverSubscriptionManager )
+ {
+ failoverSubscriptionManager->failover ( );
+ }
+ session = newSession;
}
-
+void FailoverSession::setFailoverSubscriptionManager(FailoverSubscriptionManager* fsm) {
+ failoverSubscriptionManager = fsm;
+}
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/FailoverSession.h b/qpid/cpp/src/qpid/client/FailoverSession.h
index 713c72e460..7ff26f8079 100644
--- a/qpid/cpp/src/qpid/client/FailoverSession.h
+++ b/qpid/cpp/src/qpid/client/FailoverSession.h
@@ -35,6 +35,8 @@
#include "qpid/client/SessionImpl.h"
#include "qpid/client/TypedResult.h"
#include "qpid/shared_ptr.h"
+#include "qpid/sys/Mutex.h"
+
#include <string>
@@ -295,11 +297,13 @@ class FailoverSession
void failover ( );
- FailoverSubscriptionManager * failoverSubscriptionManager;
-
+ void setFailoverSubscriptionManager(FailoverSubscriptionManager*);
private:
+ typedef sys::Mutex::ScopedLock Lock;
+ sys::Mutex lock;
+ FailoverSubscriptionManager * failoverSubscriptionManager;
Session session;
Session newSession;
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index 2b108c1303..5a790e26cd 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -33,11 +33,11 @@ namespace client {
FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
- name("no_name"),
- newSessionIsValid(false)
+ name("no_name"),
+ newSessionIsValid(false)
{
- subscriptionManager = new SubscriptionManager(fs->session);
- fs->failoverSubscriptionManager = this;
+ subscriptionManager = new SubscriptionManager(fs->session);
+ fs->setFailoverSubscriptionManager(this);
}
@@ -45,8 +45,9 @@ FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs)
void
FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
{
- newSession = _newSession;
- newSessionIsValid = true;
+ Lock l(lock);
+ newSession = _newSession;
+ newSessionIsValid = true;
}
@@ -54,27 +55,27 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
void
FailoverSubscriptionManager::failover ( )
{
- subscriptionManager->stop();
- // TODO -- save vector of boost bind fns.
+ // Stop the subscription manager thread so it can notice the failover in progress.
+ subscriptionManager->stop();
}
FailoverSubscriptionManager::subscribeArgs::subscribeArgs
- ( int _interface,
- MessageListener * _listener,
- LocalQueue * _localQueue,
- const std::string * _queue,
- const FlowControl * _flow,
- const std::string * _tag
- ) :
- interface(_interface),
- listener(_listener),
- localQueue(_localQueue),
- queue(_queue),
- flow(_flow),
- tag(_tag)
+( int _interface,
+ MessageListener * _listener,
+ LocalQueue * _localQueue,
+ const std::string * _queue,
+ const FlowControl * _flow,
+ const std::string * _tag
+) :
+ interface(_interface),
+ listener(_listener),
+ localQueue(_localQueue),
+ queue(_queue),
+ flow(_flow),
+ tag(_tag)
{
}
@@ -86,14 +87,14 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
const FlowControl & flow,
const std::string & tag
- )
+)
{
- subscriptionManager->subscribe ( listener,
- queue,
- flow,
- tag
- );
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
+ subscriptionManager->subscribe ( listener,
+ queue,
+ flow,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
}
@@ -103,14 +104,14 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
const FlowControl & flow,
const std::string & tag
- )
+)
{
- subscriptionManager->subscribe ( localQueue,
- queue,
- flow,
- tag
- );
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
+ subscriptionManager->subscribe ( localQueue,
+ queue,
+ flow,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
}
@@ -119,14 +120,14 @@ void
FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
const std::string & tag
- )
+)
{
- subscriptionManager->subscribe ( listener,
- queue,
- tag
- );
- // TODO -- more than one subscription
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
+ subscriptionManager->subscribe ( listener,
+ queue,
+ tag
+ );
+ // TODO -- more than one subscription
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
}
@@ -136,13 +137,13 @@ void
FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
const std::string & tag
- )
+)
{
- subscriptionManager->subscribe ( localQueue,
- queue,
- tag
- );
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
+ subscriptionManager->subscribe ( localQueue,
+ queue,
+ tag
+ );
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
}
@@ -151,9 +152,10 @@ bool
FailoverSubscriptionManager::get ( Message & result,
const std::string & queue,
sys::Duration timeout
- )
+)
{
- return subscriptionManager->get ( result, queue, timeout );
+
+ return subscriptionManager->get ( result, queue, timeout );
}
@@ -161,7 +163,8 @@ FailoverSubscriptionManager::get ( Message & result,
void
FailoverSubscriptionManager::cancel ( const std::string tag )
{
- subscriptionManager->cancel ( tag );
+
+ subscriptionManager->cancel ( tag );
}
@@ -169,47 +172,40 @@ FailoverSubscriptionManager::cancel ( const std::string tag )
void
FailoverSubscriptionManager::run ( ) // User Thread
{
- // FIXME mgoulish -- wait on a monitor here instead of this infinite loop
- while ( 1 )
- {
- subscriptionManager->run ( );
-
- // When we drop out of run, if there is a new Session
- // waiting for us, this is a failover. Otherwise, just
- // return control to usercode.
- sleep(1); // FIXME mgoulish -- get rid of this when we have wait-on-monitor.
-
- if ( newSessionIsValid )
- {
- delete subscriptionManager;
- subscriptionManager = new SubscriptionManager(newSession);
- // FIXME mgoulish make this an array of boost bind fns
- //
- for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin();
- i < subscribeFns.end();
- ++ i
- )
- {
- std::cerr << "MDEBUG new new resubscribe.\n";
- (*i) ();
- }
-
- newSessionIsValid = false;
- }
- else
+ while ( 1 )
{
- // break; TODO -- fix this
+ subscriptionManager->run ( );
+ Lock l(lock);
+ // When we drop out of run, if there is a new Session
+ // waiting for us, this is a failover. Otherwise, just
+ // return control to usercode.
+ if ( newSessionIsValid )
+ {
+ delete subscriptionManager;
+ subscriptionManager = new SubscriptionManager(newSession);
+ for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin();
+ i < subscribeFns.end();
+ ++ i
+ )
+ {
+ (*i) ();
+ }
+ newSessionIsValid = false;
+ }
+ else
+ {
+ // Not a failover, return to user code.
+ break;
+ }
}
- }
-
}
-
void
FailoverSubscriptionManager::start ( )
{
- subscriptionManager->start ( );
+
+ subscriptionManager->start ( );
}
@@ -217,7 +213,8 @@ FailoverSubscriptionManager::start ( )
void
FailoverSubscriptionManager::setAutoStop ( bool set )
{
- subscriptionManager->setAutoStop ( set );
+
+ subscriptionManager->setAutoStop ( set );
}
@@ -225,7 +222,8 @@ FailoverSubscriptionManager::setAutoStop ( bool set )
void
FailoverSubscriptionManager::stop ( )
{
- subscriptionManager->stop ( );
+
+ subscriptionManager->stop ( );
}
@@ -233,9 +231,10 @@ FailoverSubscriptionManager::stop ( )
void
FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
const FlowControl & flow
- )
+)
{
- subscriptionManager->setFlowControl ( destination, flow );
+
+ subscriptionManager->setFlowControl ( destination, flow );
}
@@ -243,7 +242,8 @@ FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
void
FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
{
- subscriptionManager->setFlowControl ( flow );
+
+ subscriptionManager->setFlowControl ( flow );
}
@@ -251,7 +251,8 @@ FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
const FlowControl &
FailoverSubscriptionManager::getFlowControl ( ) const
{
- return subscriptionManager->getFlowControl ( );
+
+ return subscriptionManager->getFlowControl ( );
}
@@ -262,13 +263,14 @@ FailoverSubscriptionManager::setFlowControl ( const std::string & tag,
uint32_t messages,
uint32_t bytes,
bool window
- )
+)
{
- subscriptionManager->setFlowControl ( tag,
- messages,
- bytes,
- window
- );
+
+ subscriptionManager->setFlowControl ( tag,
+ messages,
+ bytes,
+ window
+ );
}
@@ -277,12 +279,13 @@ void
FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
uint32_t bytes,
bool window
- )
+)
{
- subscriptionManager->setFlowControl ( messages,
- bytes,
- window
- );
+
+ subscriptionManager->setFlowControl ( messages,
+ bytes,
+ window
+ );
}
@@ -290,7 +293,8 @@ FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
void
FailoverSubscriptionManager::setAcceptMode ( bool required )
{
- subscriptionManager->setAcceptMode ( required );
+
+ subscriptionManager->setAcceptMode ( required );
}
@@ -298,7 +302,8 @@ FailoverSubscriptionManager::setAcceptMode ( bool required )
void
FailoverSubscriptionManager::setAcquireMode ( bool acquire )
{
- subscriptionManager->setAcquireMode ( acquire );
+
+ subscriptionManager->setAcquireMode ( acquire );
}
@@ -306,7 +311,8 @@ FailoverSubscriptionManager::setAcquireMode ( bool acquire )
void
FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
{
- subscriptionManager->setAckPolicy ( autoAck );
+
+ subscriptionManager->setAckPolicy ( autoAck );
}
@@ -314,16 +320,12 @@ FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
AckPolicy &
FailoverSubscriptionManager::getAckPolicy()
{
- return subscriptionManager->getAckPolicy ( );
+
+ return subscriptionManager->getAckPolicy ( );
}
-void
-FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ )
-{
- // FIXME mgoulish -- get rid of this mechanism -- i think it's unused.
-}
diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
index fe96742876..8678f5683c 100644
--- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
+++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
@@ -33,6 +33,7 @@
#include <qpid/client/LocalQueue.h>
#include <qpid/client/FlowControl.h>
#include <qpid/sys/Runnable.h>
+#include <qpid/sys/Mutex.h>
@@ -106,7 +107,6 @@ class FailoverSubscriptionManager
AckPolicy & getAckPolicy();
- void registerFailoverHandler ( boost::function<void ()> fh );
// Get ready for a failover.
void prepareForFailover ( Session newSession );
@@ -116,6 +116,8 @@ class FailoverSubscriptionManager
private:
+ typedef sys::Mutex::ScopedLock Lock;
+ sys::Mutex lock;
SubscriptionManager * subscriptionManager;