diff options
author | Alan Conway <aconway@apache.org> | 2008-10-10 21:19:46 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-10 21:19:46 +0000 |
commit | ce1de9ac11d243051711720cf12b9275c94efb45 (patch) | |
tree | b39e940a65d5bfc59f70aa6e136e1c1abcb34674 /qpid | |
parent | 5605be8d83cb6072780525f2183e637135a9004a (diff) | |
download | qpid-python-ce1de9ac11d243051711720cf12b9275c94efb45.tar.gz |
Failover client and example fixes & tidy up.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703575 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/examples/failover/direct_producer.cpp | 136 | ||||
-rw-r--r-- | qpid/cpp/examples/failover/listener.cpp | 37 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverConnection.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverConnection.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSession.cpp | 321 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSession.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp | 222 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h | 4 |
8 files changed, 360 insertions, 380 deletions
diff --git a/qpid/cpp/examples/failover/direct_producer.cpp b/qpid/cpp/examples/failover/direct_producer.cpp index d8e74cdc41..4f91c9c4d4 100644 --- a/qpid/cpp/examples/failover/direct_producer.cpp +++ b/qpid/cpp/examples/failover/direct_producer.cpp @@ -1,14 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + #include <qpid/client/FailoverConnection.h> #include <qpid/client/Session.h> #include <qpid/client/AsyncSession.h> #include <qpid/client/Message.h> -#include <unistd.h> -#include <cstdlib> #include <iostream> -#include <fstream> - #include <sstream> using namespace qpid::client; @@ -16,102 +33,46 @@ using namespace qpid::framing; using namespace std; - - - int main ( int argc, char ** argv) { - try { - struct timeval broker_killed_time = {0,0}; - struct timeval failover_complete_time = {0,0}; - struct timeval duration = {0,0}; - - - if ( argc < 3 ) - { - std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n"; - std::cerr << "i.e. for host: 127.0.0.1\n"; - exit(1); - } - - char const * host = argv[1]; - int port = atoi(argv[2]); - char const * broker_to_kill = 0; - - if ( argc > 3 ) - { - broker_to_kill = argv[3]; - std::cerr << "main: Broker marked for death is process ID " - << broker_to_kill - << endl; - } - else - { - std::cerr << "PRODUCER main: there is no broker to kill.\n"; - } + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + int count = argc>3 ? atoi(argv[3]) : 30; + try { FailoverConnection connection; FailoverSession * session; Message message; string program_name = "PRODUCER"; - - connection.failoverCompleteTime = & failover_complete_time; - connection.name = program_name; connection.open ( host, port ); - session = connection.newSession(); - session->name = program_name; - - int send_this_many = 30, - messages_sent = 0; - - while ( messages_sent < send_this_many ) - { - if ( (messages_sent == 13) && broker_to_kill ) - { - char command[1000]; - std::cerr << program_name << " killing broker " << broker_to_kill << ".\n"; - sprintf(command, "kill -9 %s", broker_to_kill); - system ( command ); - gettimeofday ( & broker_killed_time, 0 ); - } - + int sent = 0; + while ( sent < count ) { message.getDeliveryProperties().setRoutingKey("routing_key"); - - std::cerr << "sending message " - << messages_sent + std::cout << "sending message " + << sent << " of " - << send_this_many + << count << ".\n"; - stringstream message_data; - message_data << messages_sent; + message_data << sent; message.setData(message_data.str()); - try - { - /* MICK FIXME - session.messageTransfer ( arg::content=message, - arg::destination="amq.direct" - ); */ - session->messageTransfer ( "amq.direct", - 1, - 0, - message - ); - } - catch ( const std::exception& error) - { - cerr << program_name << " exception: " << error.what() << endl; - } - + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); sleep ( 1 ); - ++ messages_sent; + ++ sent; } - message.setData ( "That's all, folks!" ); /* MICK FIXME @@ -127,22 +88,7 @@ main ( int argc, char ** argv) session->sync(); connection.close(); - - // This will be incorrect if you killed more than one... - if ( broker_to_kill ) - { - timersub ( & failover_complete_time, - & broker_killed_time, - & duration - ); - fprintf ( stderr, - "Failover time: %ld.%.6ld\n", - duration.tv_sec, - duration.tv_usec - ); - } return 0; - } catch(const std::exception& error) { std::cout << error.what() << std::endl; } diff --git a/qpid/cpp/examples/failover/listener.cpp b/qpid/cpp/examples/failover/listener.cpp index d5ade9b9e2..c4c7d096b3 100644 --- a/qpid/cpp/examples/failover/listener.cpp +++ b/qpid/cpp/examples/failover/listener.cpp @@ -1,11 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ #include <qpid/client/FailoverConnection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/SubscriptionManager.h> -#include <unistd.h> -#include <cstdlib> #include <iostream> #include <fstream> @@ -16,8 +34,6 @@ using namespace qpid::framing; using namespace std; - - struct Recorder { unsigned int max_messages; @@ -211,19 +227,12 @@ Listener::parse_message ( const std::string & msg ) int main ( int argc, char ** argv ) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + try { string program_name = "LISTENER"; - if ( argc < 3 ) - { - std::cerr << "Usage: ./listener host cluster_port_file_name\n"; - std::cerr << "i.e. for host: 127.0.0.1\n"; - exit(1); - } - - char const * host = argv[1]; - int port = atoi(argv[2]); - FailoverConnection connection; FailoverSession * session; Recorder recorder; diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.cpp b/qpid/cpp/src/qpid/client/FailoverConnection.cpp index cac680295d..c5d775bf82 100644 --- a/qpid/cpp/src/qpid/client/FailoverConnection.cpp +++ b/qpid/cpp/src/qpid/client/FailoverConnection.cpp @@ -146,9 +146,9 @@ FailoverConnection::failover ( ) * (and, through them, their SessionManagers and whatever else) * that we are about to failover to this new Connection. */ - // FIXME mgoulish -- get rid of two-passes here. - std::vector<FailoverSession *>::iterator sessions_iterator; + // FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession + std::vector<FailoverSession *>::iterator sessions_iterator; for ( sessions_iterator = sessions.begin(); sessions_iterator < sessions.end(); ++ sessions_iterator diff --git a/qpid/cpp/src/qpid/client/FailoverConnection.h b/qpid/cpp/src/qpid/client/FailoverConnection.h index 1cec0bfd5b..09e9c8bfa4 100644 --- a/qpid/cpp/src/qpid/client/FailoverConnection.h +++ b/qpid/cpp/src/qpid/client/FailoverConnection.h @@ -29,7 +29,7 @@ #include "qpid/client/FailoverConnection.h" #include "qpid/client/FailoverSession.h" #include "qpid/client/FailoverSubscriptionManager.h" - +#include "qpid/sys/Mutex.h" namespace qpid { @@ -82,11 +82,11 @@ class FailoverConnection private: - std::string host; + typedef sys::Mutex::ScopedLock Lock; - Connection connection; + sys::Mutex lock; - int currentPortNumber; + Connection connection; boost::function<void ()> clientFailoverCallback; diff --git a/qpid/cpp/src/qpid/client/FailoverSession.cpp b/qpid/cpp/src/qpid/client/FailoverSession.cpp index 1e20edde4a..c6fb573bce 100644 --- a/qpid/cpp/src/qpid/client/FailoverSession.cpp +++ b/qpid/cpp/src/qpid/client/FailoverSession.cpp @@ -38,10 +38,10 @@ namespace qpid { namespace client { FailoverSession::FailoverSession ( ) : - name("no_name") + name("no_name") { - // The session is created by FailoverConnection::newSession - failoverSubscriptionManager = 0; + // The session is created by FailoverConnection::newSession + failoverSubscriptionManager = 0; } @@ -53,50 +53,54 @@ FailoverSession::~FailoverSession ( ) framing::FrameSet::shared_ptr FailoverSession::get() { - return session.get(); + return session.get(); } SessionId FailoverSession::getId() const { - return session.getId(); + return session.getId(); } void FailoverSession::close() { - session.close(); + session.close(); } void FailoverSession::sync() { - session.sync(); + + session.sync(); } uint32_t FailoverSession::timeout(uint32_t /*seconds*/ ) { - // MICK WTF? return session.timeout ( seconds ); - return 0; + + // MICK WTF? return session.timeout ( seconds ); + return 0; } Execution& FailoverSession::getExecution() { - return session.getExecution(); + + return session.getExecution(); } void FailoverSession::flush() { - session.flush(); + + session.flush(); } @@ -104,9 +108,10 @@ void FailoverSession::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer - ) +) { - session.markCompleted ( id, cumulative, notifyPeer ); + + session.markCompleted ( id, cumulative, notifyPeer ); } @@ -116,7 +121,8 @@ FailoverSession::markCompleted(const framing::SequenceNumber& id, void FailoverSession::executionSync() { - session.executionSync(); + + session.executionSync(); } @@ -124,11 +130,12 @@ FailoverSession::executionSync() void FailoverSession::executionResult ( const SequenceNumber& commandId, const string& value - ) +) { - session.executionResult ( commandId, - value - ); + + session.executionResult ( commandId, + value + ); } @@ -141,16 +148,17 @@ FailoverSession::executionException ( uint16_t errorCode, uint8_t fieldIndex, const string& description, const FieldTable& errorInfo - ) +) { - session.executionException ( errorCode, - commandId, - classCode, - commandCode, - fieldIndex, - description, - errorInfo - ); + + session.executionException ( errorCode, + commandId, + classCode, + commandCode, + fieldIndex, + description, + errorInfo + ); } @@ -160,13 +168,14 @@ FailoverSession::messageTransfer ( const string& destination, uint8_t acceptMode, uint8_t acquireMode, const MethodContent& content - ) +) { - session.messageTransfer ( destination, - acceptMode, - acquireMode, - content - ); + + session.messageTransfer ( destination, + acceptMode, + acquireMode, + content + ); } @@ -174,7 +183,8 @@ FailoverSession::messageTransfer ( const string& destination, void FailoverSession::messageAccept ( const SequenceSet& transfers ) { - session.messageAccept ( transfers ); + + session.messageAccept ( transfers ); } @@ -183,12 +193,13 @@ void FailoverSession::messageReject ( const SequenceSet& transfers, uint16_t code, const string& text - ) +) { - session.messageReject ( transfers, - code, - text - ); + + session.messageReject ( transfers, + code, + text + ); } @@ -196,11 +207,12 @@ FailoverSession::messageReject ( const SequenceSet& transfers, void FailoverSession::messageRelease ( const SequenceSet& transfers, bool setRedelivered - ) +) { - session.messageRelease ( transfers, - setRedelivered - ); + + session.messageRelease ( transfers, + setRedelivered + ); } @@ -208,7 +220,8 @@ FailoverSession::messageRelease ( const SequenceSet& transfers, qpid::framing::MessageAcquireResult FailoverSession::messageAcquire ( const SequenceSet& transfers ) { - return session.messageAcquire ( transfers ); + + return session.messageAcquire ( transfers ); } @@ -216,11 +229,12 @@ FailoverSession::messageAcquire ( const SequenceSet& transfers ) qpid::framing::MessageResumeResult FailoverSession::messageResume ( const string& destination, const string& resumeId - ) +) { - return session.messageResume ( destination, - resumeId - ); + + return session.messageResume ( destination, + resumeId + ); } @@ -234,17 +248,18 @@ FailoverSession::messageSubscribe ( const string& queue, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments - ) +) { - session.messageSubscribe ( queue, - destination, - acceptMode, - acquireMode, - exclusive, - resumeId, - resumeTtl, - arguments - ); + + session.messageSubscribe ( queue, + destination, + acceptMode, + acquireMode, + exclusive, + resumeId, + resumeTtl, + arguments + ); } @@ -252,7 +267,8 @@ FailoverSession::messageSubscribe ( const string& queue, void FailoverSession::messageCancel ( const string& destinations ) { - session.messageCancel ( destinations ); + + session.messageCancel ( destinations ); } @@ -260,11 +276,11 @@ FailoverSession::messageCancel ( const string& destinations ) void FailoverSession::messageSetFlowMode ( const string& destination, uint8_t flowMode - ) +) { - session.messageSetFlowMode ( destination, - flowMode - ); + session.messageSetFlowMode ( destination, + flowMode + ); } @@ -274,10 +290,10 @@ FailoverSession::messageFlow(const string& destination, uint8_t unit, uint32_t value) { - session.messageFlow ( destination, - unit, - value - ); + session.messageFlow ( destination, + unit, + value + ); } @@ -285,7 +301,7 @@ FailoverSession::messageFlow(const string& destination, void FailoverSession::messageFlush(const string& destination) { - session.messageFlush ( destination ); + session.messageFlush ( destination ); } @@ -293,7 +309,7 @@ FailoverSession::messageFlush(const string& destination) void FailoverSession::messageStop(const string& destination) { - session.messageStop ( destination ); + session.messageStop ( destination ); } @@ -301,7 +317,7 @@ FailoverSession::messageStop(const string& destination) void FailoverSession::txSelect() { - session.txSelect ( ); + session.txSelect ( ); } @@ -309,7 +325,7 @@ FailoverSession::txSelect() void FailoverSession::txCommit() { - session.txCommit ( ); + session.txCommit ( ); } @@ -317,7 +333,7 @@ FailoverSession::txCommit() void FailoverSession::txRollback() { - session.txRollback ( ); + session.txRollback ( ); } @@ -325,7 +341,7 @@ FailoverSession::txRollback() void FailoverSession::dtxSelect() { - session.dtxSelect ( ); + session.dtxSelect ( ); } @@ -335,10 +351,10 @@ FailoverSession::dtxStart(const Xid& xid, bool join, bool resume) { - return session.dtxStart ( xid, - join, - resume - ); + return session.dtxStart ( xid, + join, + resume + ); } @@ -348,10 +364,10 @@ FailoverSession::dtxEnd(const Xid& xid, bool fail, bool suspend) { - return session.dtxEnd ( xid, - fail, - suspend - ); + return session.dtxEnd ( xid, + fail, + suspend + ); } @@ -360,9 +376,9 @@ qpid::framing::XaResult FailoverSession::dtxCommit(const Xid& xid, bool onePhase) { - return session.dtxCommit ( xid, - onePhase - ); + return session.dtxCommit ( xid, + onePhase + ); } @@ -370,7 +386,7 @@ FailoverSession::dtxCommit(const Xid& xid, void FailoverSession::dtxForget(const Xid& xid) { - session.dtxForget ( xid ); + session.dtxForget ( xid ); } @@ -378,7 +394,7 @@ FailoverSession::dtxForget(const Xid& xid) qpid::framing::DtxGetTimeoutResult FailoverSession::dtxGetTimeout(const Xid& xid) { - return session.dtxGetTimeout ( xid ); + return session.dtxGetTimeout ( xid ); } @@ -386,7 +402,7 @@ FailoverSession::dtxGetTimeout(const Xid& xid) qpid::framing::XaResult FailoverSession::dtxPrepare(const Xid& xid) { - return session.dtxPrepare ( xid ); + return session.dtxPrepare ( xid ); } @@ -394,7 +410,7 @@ FailoverSession::dtxPrepare(const Xid& xid) qpid::framing::DtxRecoverResult FailoverSession::dtxRecover() { - return session.dtxRecover ( ); + return session.dtxRecover ( ); } @@ -402,7 +418,7 @@ FailoverSession::dtxRecover() qpid::framing::XaResult FailoverSession::dtxRollback(const Xid& xid) { - return session.dtxRollback ( xid ); + return session.dtxRollback ( xid ); } @@ -411,9 +427,9 @@ void FailoverSession::dtxSetTimeout(const Xid& xid, uint32_t timeout) { - session.dtxSetTimeout ( xid, - timeout - ); + session.dtxSetTimeout ( xid, + timeout + ); } @@ -427,14 +443,14 @@ FailoverSession::exchangeDeclare(const string& exchange, bool autoDelete, const FieldTable& arguments) { - session.exchangeDeclare ( exchange, - type, - alternateExchange, - passive, - durable, - autoDelete, - arguments - ); + session.exchangeDeclare ( exchange, + type, + alternateExchange, + passive, + durable, + autoDelete, + arguments + ); } @@ -443,9 +459,9 @@ void FailoverSession::exchangeDelete(const string& exchange, bool ifUnused) { - session.exchangeDelete ( exchange, - ifUnused - ); + session.exchangeDelete ( exchange, + ifUnused + ); } @@ -453,7 +469,7 @@ FailoverSession::exchangeDelete(const string& exchange, qpid::framing::ExchangeQueryResult FailoverSession::exchangeQuery(const string& name) { - return session.exchangeQuery ( name ); + return session.exchangeQuery ( name ); } @@ -464,11 +480,11 @@ FailoverSession::exchangeBind(const string& queue, const string& bindingKey, const FieldTable& arguments) { - session.exchangeBind ( queue, - exchange, - bindingKey, - arguments - ); + session.exchangeBind ( queue, + exchange, + bindingKey, + arguments + ); } @@ -478,10 +494,10 @@ FailoverSession::exchangeUnbind(const string& queue, const string& exchange, const string& bindingKey) { - session.exchangeUnbind ( queue, - exchange, - bindingKey - ); + session.exchangeUnbind ( queue, + exchange, + bindingKey + ); } @@ -492,11 +508,11 @@ FailoverSession::exchangeBound(const string& exchange, const string& bindingKey, const FieldTable& arguments) { - return session.exchangeBound ( exchange, - queue, - bindingKey, - arguments - ); + return session.exchangeBound ( exchange, + queue, + bindingKey, + arguments + ); } @@ -510,14 +526,14 @@ FailoverSession::queueDeclare(const string& queue, bool autoDelete, const FieldTable& arguments) { - session.queueDeclare ( queue, - alternateExchange, - passive, - durable, - exclusive, - autoDelete, - arguments - ); + session.queueDeclare ( queue, + alternateExchange, + passive, + durable, + exclusive, + autoDelete, + arguments + ); } @@ -527,10 +543,10 @@ FailoverSession::queueDelete(const string& queue, bool ifUnused, bool ifEmpty) { - session.queueDelete ( queue, - ifUnused, - ifEmpty - ); + session.queueDelete ( queue, + ifUnused, + ifEmpty + ); } @@ -538,7 +554,7 @@ FailoverSession::queueDelete(const string& queue, void FailoverSession::queuePurge(const string& queue) { - session.queuePurge ( queue) ; + session.queuePurge ( queue) ; } @@ -546,7 +562,7 @@ FailoverSession::queuePurge(const string& queue) qpid::framing::QueueQueryResult FailoverSession::queueQuery(const string& queue) { - return session.queueQuery ( queue ); + return session.queueQuery ( queue ); } @@ -557,20 +573,20 @@ FailoverSession::queueQuery(const string& queue) void FailoverSession::prepareForFailover ( Connection newConnection ) { - try - { - newSession = newConnection.newSession(); - } - catch ( const std::exception& error ) - { - throw Exception(QPID_MSG("Can't create failover session.")); - } - + try + { + newSession = newConnection.newSession(); + } + catch ( const std::exception& error ) + { + throw Exception(QPID_MSG("Can't create failover session.")); + } - if ( failoverSubscriptionManager ) - { - failoverSubscriptionManager->prepareForFailover ( newSession ); - } + if ( failoverSubscriptionManager ) + { + // + failoverSubscriptionManager->prepareForFailover ( newSession ); + } } @@ -578,15 +594,16 @@ FailoverSession::prepareForFailover ( Connection newConnection ) void FailoverSession::failover ( ) { - if ( failoverSubscriptionManager ) - { - failoverSubscriptionManager->failover ( ); - } - - session = newSession; + if ( failoverSubscriptionManager ) + { + failoverSubscriptionManager->failover ( ); + } + session = newSession; } - +void FailoverSession::setFailoverSubscriptionManager(FailoverSubscriptionManager* fsm) { + failoverSubscriptionManager = fsm; +} }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/FailoverSession.h b/qpid/cpp/src/qpid/client/FailoverSession.h index 713c72e460..7ff26f8079 100644 --- a/qpid/cpp/src/qpid/client/FailoverSession.h +++ b/qpid/cpp/src/qpid/client/FailoverSession.h @@ -35,6 +35,8 @@ #include "qpid/client/SessionImpl.h" #include "qpid/client/TypedResult.h" #include "qpid/shared_ptr.h" +#include "qpid/sys/Mutex.h" + #include <string> @@ -295,11 +297,13 @@ class FailoverSession void failover ( ); - FailoverSubscriptionManager * failoverSubscriptionManager; - + void setFailoverSubscriptionManager(FailoverSubscriptionManager*); private: + typedef sys::Mutex::ScopedLock Lock; + sys::Mutex lock; + FailoverSubscriptionManager * failoverSubscriptionManager; Session session; Session newSession; diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp index 2b108c1303..5a790e26cd 100644 --- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -33,11 +33,11 @@ namespace client { FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : - name("no_name"), - newSessionIsValid(false) + name("no_name"), + newSessionIsValid(false) { - subscriptionManager = new SubscriptionManager(fs->session); - fs->failoverSubscriptionManager = this; + subscriptionManager = new SubscriptionManager(fs->session); + fs->setFailoverSubscriptionManager(this); } @@ -45,8 +45,9 @@ FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) void FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) { - newSession = _newSession; - newSessionIsValid = true; + Lock l(lock); + newSession = _newSession; + newSessionIsValid = true; } @@ -54,27 +55,27 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) void FailoverSubscriptionManager::failover ( ) { - subscriptionManager->stop(); - // TODO -- save vector of boost bind fns. + // Stop the subscription manager thread so it can notice the failover in progress. + subscriptionManager->stop(); } FailoverSubscriptionManager::subscribeArgs::subscribeArgs - ( int _interface, - MessageListener * _listener, - LocalQueue * _localQueue, - const std::string * _queue, - const FlowControl * _flow, - const std::string * _tag - ) : - interface(_interface), - listener(_listener), - localQueue(_localQueue), - queue(_queue), - flow(_flow), - tag(_tag) +( int _interface, + MessageListener * _listener, + LocalQueue * _localQueue, + const std::string * _queue, + const FlowControl * _flow, + const std::string * _tag +) : + interface(_interface), + listener(_listener), + localQueue(_localQueue), + queue(_queue), + flow(_flow), + tag(_tag) { } @@ -86,14 +87,14 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, const FlowControl & flow, const std::string & tag - ) +) { - subscriptionManager->subscribe ( listener, - queue, - flow, - tag - ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) ); + subscriptionManager->subscribe ( listener, + queue, + flow, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) ); } @@ -103,14 +104,14 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, const FlowControl & flow, const std::string & tag - ) +) { - subscriptionManager->subscribe ( localQueue, - queue, - flow, - tag - ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) ); + subscriptionManager->subscribe ( localQueue, + queue, + flow, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) ); } @@ -119,14 +120,14 @@ void FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, const std::string & tag - ) +) { - subscriptionManager->subscribe ( listener, - queue, - tag - ); - // TODO -- more than one subscription - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) ); + subscriptionManager->subscribe ( listener, + queue, + tag + ); + // TODO -- more than one subscription + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) ); } @@ -136,13 +137,13 @@ void FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, const std::string & tag - ) +) { - subscriptionManager->subscribe ( localQueue, - queue, - tag - ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) ); + subscriptionManager->subscribe ( localQueue, + queue, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) ); } @@ -151,9 +152,10 @@ bool FailoverSubscriptionManager::get ( Message & result, const std::string & queue, sys::Duration timeout - ) +) { - return subscriptionManager->get ( result, queue, timeout ); + + return subscriptionManager->get ( result, queue, timeout ); } @@ -161,7 +163,8 @@ FailoverSubscriptionManager::get ( Message & result, void FailoverSubscriptionManager::cancel ( const std::string tag ) { - subscriptionManager->cancel ( tag ); + + subscriptionManager->cancel ( tag ); } @@ -169,47 +172,40 @@ FailoverSubscriptionManager::cancel ( const std::string tag ) void FailoverSubscriptionManager::run ( ) // User Thread { - // FIXME mgoulish -- wait on a monitor here instead of this infinite loop - while ( 1 ) - { - subscriptionManager->run ( ); - - // When we drop out of run, if there is a new Session - // waiting for us, this is a failover. Otherwise, just - // return control to usercode. - sleep(1); // FIXME mgoulish -- get rid of this when we have wait-on-monitor. - - if ( newSessionIsValid ) - { - delete subscriptionManager; - subscriptionManager = new SubscriptionManager(newSession); - // FIXME mgoulish make this an array of boost bind fns - // - for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); - i < subscribeFns.end(); - ++ i - ) - { - std::cerr << "MDEBUG new new resubscribe.\n"; - (*i) (); - } - - newSessionIsValid = false; - } - else + while ( 1 ) { - // break; TODO -- fix this + subscriptionManager->run ( ); + Lock l(lock); + // When we drop out of run, if there is a new Session + // waiting for us, this is a failover. Otherwise, just + // return control to usercode. + if ( newSessionIsValid ) + { + delete subscriptionManager; + subscriptionManager = new SubscriptionManager(newSession); + for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); + i < subscribeFns.end(); + ++ i + ) + { + (*i) (); + } + newSessionIsValid = false; + } + else + { + // Not a failover, return to user code. + break; + } } - } - } - void FailoverSubscriptionManager::start ( ) { - subscriptionManager->start ( ); + + subscriptionManager->start ( ); } @@ -217,7 +213,8 @@ FailoverSubscriptionManager::start ( ) void FailoverSubscriptionManager::setAutoStop ( bool set ) { - subscriptionManager->setAutoStop ( set ); + + subscriptionManager->setAutoStop ( set ); } @@ -225,7 +222,8 @@ FailoverSubscriptionManager::setAutoStop ( bool set ) void FailoverSubscriptionManager::stop ( ) { - subscriptionManager->stop ( ); + + subscriptionManager->stop ( ); } @@ -233,9 +231,10 @@ FailoverSubscriptionManager::stop ( ) void FailoverSubscriptionManager::setFlowControl ( const std::string & destination, const FlowControl & flow - ) +) { - subscriptionManager->setFlowControl ( destination, flow ); + + subscriptionManager->setFlowControl ( destination, flow ); } @@ -243,7 +242,8 @@ FailoverSubscriptionManager::setFlowControl ( const std::string & destination, void FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow ) { - subscriptionManager->setFlowControl ( flow ); + + subscriptionManager->setFlowControl ( flow ); } @@ -251,7 +251,8 @@ FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow ) const FlowControl & FailoverSubscriptionManager::getFlowControl ( ) const { - return subscriptionManager->getFlowControl ( ); + + return subscriptionManager->getFlowControl ( ); } @@ -262,13 +263,14 @@ FailoverSubscriptionManager::setFlowControl ( const std::string & tag, uint32_t messages, uint32_t bytes, bool window - ) +) { - subscriptionManager->setFlowControl ( tag, - messages, - bytes, - window - ); + + subscriptionManager->setFlowControl ( tag, + messages, + bytes, + window + ); } @@ -277,12 +279,13 @@ void FailoverSubscriptionManager::setFlowControl ( uint32_t messages, uint32_t bytes, bool window - ) +) { - subscriptionManager->setFlowControl ( messages, - bytes, - window - ); + + subscriptionManager->setFlowControl ( messages, + bytes, + window + ); } @@ -290,7 +293,8 @@ FailoverSubscriptionManager::setFlowControl ( uint32_t messages, void FailoverSubscriptionManager::setAcceptMode ( bool required ) { - subscriptionManager->setAcceptMode ( required ); + + subscriptionManager->setAcceptMode ( required ); } @@ -298,7 +302,8 @@ FailoverSubscriptionManager::setAcceptMode ( bool required ) void FailoverSubscriptionManager::setAcquireMode ( bool acquire ) { - subscriptionManager->setAcquireMode ( acquire ); + + subscriptionManager->setAcquireMode ( acquire ); } @@ -306,7 +311,8 @@ FailoverSubscriptionManager::setAcquireMode ( bool acquire ) void FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck ) { - subscriptionManager->setAckPolicy ( autoAck ); + + subscriptionManager->setAckPolicy ( autoAck ); } @@ -314,16 +320,12 @@ FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck ) AckPolicy & FailoverSubscriptionManager::getAckPolicy() { - return subscriptionManager->getAckPolicy ( ); + + return subscriptionManager->getAckPolicy ( ); } -void -FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ ) -{ - // FIXME mgoulish -- get rid of this mechanism -- i think it's unused. -} diff --git a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h index fe96742876..8678f5683c 100644 --- a/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h +++ b/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h @@ -33,6 +33,7 @@ #include <qpid/client/LocalQueue.h> #include <qpid/client/FlowControl.h> #include <qpid/sys/Runnable.h> +#include <qpid/sys/Mutex.h> @@ -106,7 +107,6 @@ class FailoverSubscriptionManager AckPolicy & getAckPolicy(); - void registerFailoverHandler ( boost::function<void ()> fh ); // Get ready for a failover. void prepareForFailover ( Session newSession ); @@ -116,6 +116,8 @@ class FailoverSubscriptionManager private: + typedef sys::Mutex::ScopedLock Lock; + sys::Mutex lock; SubscriptionManager * subscriptionManager; |