summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/tests/ForkedBroker.h20
-rw-r--r--cpp/src/tests/Makefile.am20
-rw-r--r--cpp/src/tests/declare_queues.cpp69
-rw-r--r--cpp/src/tests/failover_soak.cpp651
-rw-r--r--cpp/src/tests/replaying_sender.cpp131
-rw-r--r--cpp/src/tests/resuming_receiver.cpp163
-rwxr-xr-xcpp/src/tests/run_failover_soak13
7 files changed, 1058 insertions, 9 deletions
diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h
index c0d72a6235..98235b9690 100644
--- a/cpp/src/tests/ForkedBroker.h
+++ b/cpp/src/tests/ForkedBroker.h
@@ -59,17 +59,23 @@ class ForkedBroker {
}
void kill(int sig=SIGINT) {
- using qpid::ErrnoException;
if (pid == 0) return;
- if (::kill(pid, sig) < 0) throw ErrnoException("kill failed");
+ int savePid = pid;
+ pid = 0; // Always reset pid, even in case of an exception below.
+ ::close(pipeFds[1]);
+
+ using qpid::ErrnoException;
+ if (::kill(savePid, sig) < 0)
+ throw ErrnoException("kill failed");
int status;
- if (::waitpid(pid, &status, 0) < 0) throw ErrnoException("wait for forked process failed");
- if (WEXITSTATUS(status) != 0)
- throw qpid::Exception(QPID_MSG("forked broker exited with: " << WEXITSTATUS(status)));
- pid = 0;
+ if (::waitpid(savePid, &status, 0) < 0)
+ throw ErrnoException("wait for forked process failed");
+ if (WEXITSTATUS(status) != 0)
+ throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status)));
}
uint16_t getPort() { return port; }
+ pid_t getPID() { return pid; }
private:
@@ -77,7 +83,6 @@ class ForkedBroker {
using qpid::ErrnoException;
pid = 0;
port = 0;
- int pipeFds[2];
if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe");
pid = ::fork();
if (pid < 0) throw ErrnoException("Fork failed");
@@ -104,6 +109,7 @@ class ForkedBroker {
}
}
+ int pipeFds[2];
pid_t pid;
int port;
};
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 18044d73a1..78aadf0da0 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -134,6 +134,22 @@ check_PROGRAMS+=header_test
header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
header_test_LDADD=$(lib_client)
+check_PROGRAMS+=failover_soak
+failover_soak_SOURCES=failover_soak.cpp ForkedBroker.h
+failover_soak_LDADD=$(lib_client)
+
+check_PROGRAMS+=declare_queues
+declare_queues_SOURCES=declare_queues.cpp
+declare_queues_LDADD=$(lib_client)
+
+check_PROGRAMS+=replaying_sender
+replaying_sender_SOURCES=replaying_sender.cpp
+replaying_sender_LDADD=$(lib_client)
+
+check_PROGRAMS+=resuming_receiver
+resuming_receiver_SOURCES=resuming_receiver.cpp
+resuming_receiver_LDADD=$(lib_client)
+
check_PROGRAMS+=txshift
txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h
txshift_LDADD=$(lib_client)
@@ -153,7 +169,7 @@ sender_LDADD=$(lib_client)
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests
EXTRA_DIST += \
run_test vg_check \
@@ -201,7 +217,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
-LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest
+LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak
EXTRA_DIST+=$(LONG_TESTS) run_perftest
check-long:
$(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
diff --git a/cpp/src/tests/declare_queues.cpp b/cpp/src/tests/declare_queues.cpp
new file mode 100644
index 0000000000..7f61bde12a
--- /dev/null
+++ b/cpp/src/tests/declare_queues.cpp
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/Exception.h>
+
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::client;
+
+using namespace std;
+
+int main(int argc, char ** argv)
+{
+ ConnectionSettings settings;
+ if ( argc != 3 )
+ {
+ cerr << "Usage: declare_queues host port\n";
+ return 1;
+ }
+
+ settings.host = argv[1];
+ settings.port = atoi(argv[2]);
+
+ FailoverManager connection(settings);
+ try {
+ bool complete = false;
+ while (!complete) {
+ Session session = connection.connect().newSession();
+ try {
+ session.queueDeclare(arg::queue="message_queue");
+ complete = true;
+ } catch (const qpid::TransportFailure&) {}
+ }
+ connection.close();
+ return 0;
+ } catch (const exception& error) {
+ cerr << "declare_queues failed:" << error.what() << endl;
+ cerr << " host: " << settings.host
+ << " port: " << settings.port << endl;
+ return 1;
+ }
+
+}
+
+
+
+
+
diff --git a/cpp/src/tests/failover_soak.cpp b/cpp/src/tests/failover_soak.cpp
new file mode 100644
index 0000000000..6971bad8f9
--- /dev/null
+++ b/cpp/src/tests/failover_soak.cpp
@@ -0,0 +1,651 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/time.h>
+#include <string.h>
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <vector>
+
+#include <ForkedBroker.h>
+
+
+
+
+using namespace std;
+
+
+typedef vector<ForkedBroker *> brokerVector;
+
+typedef enum
+{
+ NO_STATUS,
+ RUNNING,
+ COMPLETED
+}
+childStatus;
+
+
+
+struct child
+{
+ child ( string & name, pid_t pid )
+ : name(name), pid(pid), retval(-999), status(RUNNING)
+ {
+ gettimeofday ( & startTime, 0 );
+ }
+
+
+ void
+ done ( int _retval )
+ {
+ retval = _retval;
+ status = COMPLETED;
+ gettimeofday ( & stopTime, 0 );
+ }
+
+
+ string name;
+ pid_t pid;
+ int retval;
+ childStatus status;
+ struct timeval startTime,
+ stopTime;
+};
+
+
+
+
+struct children : public vector<child *>
+{
+ void
+ add ( string & name, pid_t pid )
+ {
+ push_back(new child ( name, pid ));
+ }
+
+
+ child *
+ get ( pid_t pid )
+ {
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ if ( pid == (*i)->pid )
+ return *i;
+
+ return 0;
+ }
+
+
+ void
+ exited ( pid_t pid, int retval )
+ {
+ child * kid = get ( pid );
+ if(! kid)
+ {
+ if ( verbosity > 0 )
+ {
+ cerr << "children::exited warning: Can't find child with pid "
+ << pid
+ << endl;
+ }
+ return;
+ }
+
+ kid->done ( retval );
+ }
+
+
+ int
+ unfinished ( )
+ {
+ int count = 0;
+
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ if ( COMPLETED != (*i)->status )
+ ++ count;
+
+ return count;
+ }
+
+
+ int
+ checkChildren ( )
+ {
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) )
+ return (*i)->retval;
+
+ return 0;
+ }
+
+
+ void
+ killEverybody ( )
+ {
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ kill ( (*i)->pid, 9 );
+ }
+
+
+
+ void
+ print ( )
+ {
+ cout << "--- status of all children --------------\n";
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ cout << "child: " << (*i)->name
+ << " status: " << (*i)->status
+ << endl;
+ cout << "\n\n\n\n";
+ }
+
+
+ /*
+ Only call this if you already know there is at least
+ one child still running. Supply a time in seconds.
+ If it has been at least that long since a shild stopped
+ running, we judge the system to have hung.
+ */
+ bool
+ hanging ( int hangTime )
+ {
+ struct timeval now,
+ duration;
+ gettimeofday ( &now, 0 );
+
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ {
+ timersub ( & now, &((*i)->startTime), & duration );
+ if ( duration.tv_sec >= hangTime )
+ return true;
+ }
+
+ return false;
+ }
+
+
+ int verbosity;
+};
+
+
+
+children allMyChildren;
+
+
+
+
+void
+childExit ( int signalNumber )
+{
+ signalNumber ++; // Now maybe the compiler willleave me alone?
+ int childReturnCode;
+ pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG);
+
+ if ( pid > 0 )
+ allMyChildren.exited ( pid, childReturnCode );
+}
+
+
+
+int
+mrand ( int maxDesiredVal ) {
+ double zeroToOne = (double) rand() / (double) RAND_MAX;
+ return (int) (zeroToOne * (double) maxDesiredVal);
+}
+
+
+
+int
+mrand ( int minDesiredVal, int maxDesiredVal ) {
+ int interval = maxDesiredVal - minDesiredVal;
+ return minDesiredVal + mrand ( interval );
+}
+
+
+
+void
+makeClusterName ( string & s, int & num ) {
+ num = mrand(1000);
+ stringstream ss;
+ ss << "soakTestCluster_" << num;
+ s = ss.str();
+}
+
+
+
+
+
+void
+printBrokers ( brokerVector & brokers )
+{
+ cout << "Broker List ------------ size: " << brokers.size() << "\n";
+ for ( brokerVector::iterator i = brokers.begin(); i != brokers.end(); ++ i) {
+ cout << "pid: "
+ << (*i)->getPID()
+ << " port: "
+ << (*i)->getPort()
+ << endl;
+ }
+ cout << "end Broker List ------------\n";
+}
+
+
+
+
+
+void
+startNewBroker ( brokerVector & brokers,
+ char const * srcRoot,
+ char const * moduleDir,
+ string const clusterName )
+{
+ stringstream path;
+ path << srcRoot << "/qpidd";
+
+ const char * const argv[] =
+ {
+ "qpidd",
+ "-p0",
+ "--module-dir",
+ moduleDir,
+ "--load-module=cluster.so",
+ "--cluster-name",
+ clusterName.c_str(),
+ "--auth=no",
+ "--no-data-dir",
+ "--mgmt-enable=no",
+ 0
+ };
+
+ size_t argc = sizeof(argv)/sizeof(argv[0]);
+ brokers.push_back ( new ForkedBroker ( argc, argv ) );
+}
+
+
+
+
+
+void
+killFrontBroker ( brokerVector & brokers, int verbosity )
+{
+ if ( verbosity > 0 )
+ cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " << brokers[0]->getPort() << endl;
+ try { brokers[0]->kill(9); }
+ catch ( const exception& error ) {
+ if ( verbosity > 0 )
+ cout << "error killing broker: " << error.what() << endl;
+ }
+ delete brokers[0];
+ brokers.erase ( brokers.begin() );
+}
+
+
+
+
+
+void
+killAllBrokers ( brokerVector & brokers )
+{
+ for ( uint i = 0; i < brokers.size(); ++ i )
+ try { brokers[i]->kill(9); }
+ catch ( ... ) { }
+}
+
+
+
+
+
+pid_t
+runDeclareQueuesClient ( brokerVector brokers,
+ char const * host,
+ char const * path,
+ int verbosity
+ )
+{
+ string name("declareQueues");
+ int port = brokers[0]->getPort ( );
+
+ if ( verbosity > 0 )
+ cout << "startDeclareQueuesClient: host: "
+ << host
+ << " port: "
+ << port
+ << endl;
+ stringstream portSs;
+ portSs << port;
+
+ vector<const char*> argv;
+ argv.push_back ( "declareQueues" );
+ argv.push_back ( host );
+ argv.push_back ( portSs.str().c_str() );
+ argv.push_back ( 0 );
+ pid_t pid = fork();
+
+ if ( ! pid ) {
+ execv ( path, const_cast<char * const *>(&argv[0]) );
+ perror ( "error executing dq: " );
+ return 0;
+ }
+
+ allMyChildren.add ( name, pid );
+ return pid;
+}
+
+
+
+
+
+pid_t
+startReceivingClient ( brokerVector brokers,
+ char const * host,
+ char const * receiverPath,
+ char const * reportFrequency,
+ int verbosity
+ )
+{
+ string name("receiver");
+ int port = brokers[0]->getPort ( );
+
+ if ( verbosity > 0 )
+ cout << "startReceivingClient: port " << port << endl;
+ char portStr[100];
+ char verbosityStr[100];
+ sprintf(portStr, "%d", port);
+ sprintf(verbosityStr, "%d", verbosity);
+
+
+ vector<const char*> argv;
+ argv.push_back ( "resumingReceiver" );
+ argv.push_back ( host );
+ argv.push_back ( portStr );
+ argv.push_back ( reportFrequency );
+ argv.push_back ( verbosityStr );
+ argv.push_back ( 0 );
+
+ pid_t pid = fork();
+
+ if ( ! pid ) {
+ execv ( receiverPath, const_cast<char * const *>(&argv[0]) );
+ perror ( "error executing receiver: " );
+ return 0;
+ }
+
+ allMyChildren.add ( name, pid );
+ return pid;
+}
+
+
+
+
+
+pid_t
+startSendingClient ( brokerVector brokers,
+ char const * host,
+ char const * senderPath,
+ char const * nMessages,
+ char const * reportFrequency,
+ int verbosity
+ )
+{
+ string name("sender");
+ int port = brokers[0]->getPort ( );
+
+ if ( verbosity )
+ cout << "startSenderClient: port " << port << endl;
+ char portStr[100];
+ char verbosityStr[100];
+
+ sprintf ( portStr, "%d", port);
+ sprintf ( verbosityStr, "%d", verbosity);
+
+ vector<const char*> argv;
+ argv.push_back ( "replayingSender" );
+ argv.push_back ( host );
+ argv.push_back ( portStr );
+ argv.push_back ( nMessages );
+ argv.push_back ( reportFrequency );
+ argv.push_back ( verbosityStr );
+ argv.push_back ( 0 );
+
+ pid_t pid = fork();
+
+ if ( ! pid ) {
+ execv ( senderPath, const_cast<char * const *>(&argv[0]) );
+ perror ( "error executing sender: " );
+ return 0;
+ }
+
+ allMyChildren.add ( name, pid );
+ return pid;
+}
+
+
+
+#define HUNKY_DORY 0
+#define BAD_ARGS 1
+#define CANT_FORK_DQ 2
+#define CANT_FORK_RECEIVER 3
+#define DQ_FAILED 4
+#define ERROR_ON_CHILD 5
+#define HANGING 6
+
+
+int
+main ( int argc, char const ** argv )
+{
+ if ( argc < 9 ) {
+ cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath receiverPath nMessages verbosity\n";
+ cerr << " ( argc was " << argc << " )\n";
+ return BAD_ARGS;
+ }
+
+ signal ( SIGCHLD, childExit );
+
+ char const * srcRoot = argv[1];
+ char const * moduleDir = argv[2];
+ char const * host = argv[3];
+ char const * declareQueuesPath = argv[4];
+ char const * senderPath = argv[5];
+ char const * receiverPath = argv[6];
+ char const * nMessages = argv[7];
+ char const * reportFrequency = argv[8];
+ int verbosity = atoi(argv[9]);
+
+ int maxBrokers = 50;
+
+ allMyChildren.verbosity = verbosity;
+
+ int clusterNum;
+ string clusterName;
+
+ srand ( getpid() );
+
+ makeClusterName ( clusterName, clusterNum );
+
+ brokerVector brokers;
+
+ if ( verbosity > 0 )
+ cout << "Starting initial cluster...\n";
+
+ int nBrokers = 3;
+ for ( int i = 0; i < nBrokers; ++ i ) {
+ startNewBroker ( brokers,
+ srcRoot,
+ moduleDir,
+ clusterName );
+ }
+
+
+ if ( verbosity > 0 )
+ printBrokers ( brokers );
+
+ // Run the declareQueues child.
+ int childStatus;
+ pid_t dqClientPid =
+ runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity );
+ if ( -1 == dqClientPid ) {
+ cerr << "failoverSoak error: Couldn't fork declareQueues.\n";
+ return CANT_FORK_DQ;
+ }
+
+ // Don't continue until declareQueues is finished.
+ pid_t retval = waitpid ( dqClientPid, & childStatus, 0);
+ if ( retval != dqClientPid) {
+ cerr << "failoverSoak error: waitpid on declareQueues returned value " << retval << endl;
+ return DQ_FAILED;
+ }
+ allMyChildren.exited ( dqClientPid, childStatus );
+
+
+
+ // Start the receiving client.
+ pid_t receivingClientPid =
+ startReceivingClient ( brokers,
+ host,
+ receiverPath,
+ reportFrequency,
+ verbosity );
+ if ( -1 == receivingClientPid ) {
+ cerr << "failoverSoak error: Couldn't fork receiver.\n";
+ return CANT_FORK_RECEIVER;
+ }
+
+
+ // Start the sending client.
+ pid_t sendingClientPid =
+ startSendingClient ( brokers,
+ host,
+ senderPath,
+ nMessages,
+ reportFrequency,
+ verbosity );
+ if ( -1 == sendingClientPid ) {
+ cerr << "failoverSoak error: Couldn't fork sender.\n";
+ return CANT_FORK_RECEIVER;
+ }
+
+
+ int minSleep = 3,
+ maxSleep = 6;
+
+
+ for ( int totalBrokers = 3;
+ totalBrokers < maxBrokers;
+ ++ totalBrokers
+ )
+ {
+ if ( verbosity > 0 )
+ cout << totalBrokers << " brokers have been added to the cluster.\n\n\n";
+
+ // Sleep for a while. -------------------------
+ int sleepyTime = mrand ( minSleep, maxSleep );
+ if ( verbosity > 0 )
+ cout << "Sleeping for " << sleepyTime << " seconds.\n";
+ sleep ( sleepyTime );
+
+ // Kill the oldest broker. --------------------------
+ killFrontBroker ( brokers, verbosity );
+
+ // Sleep for a while. -------------------------
+ sleepyTime = mrand ( minSleep, maxSleep );
+ if ( verbosity > 0 )
+ cerr << "Sleeping for " << sleepyTime << " seconds.\n";
+ sleep ( sleepyTime );
+
+ // Start a new broker. --------------------------
+ if ( verbosity > 0 )
+ cout << "Starting new broker.\n\n";
+
+ startNewBroker ( brokers,
+ srcRoot,
+ moduleDir,
+ clusterName );
+
+ if ( verbosity > 0 )
+ printBrokers ( brokers );
+
+ // If all children have exited, quit.
+ int unfinished = allMyChildren.unfinished();
+ if ( ! unfinished ) {
+ killAllBrokers ( brokers );
+
+ if ( verbosity > 0 )
+ cout << "failoverSoak: all children have exited.\n";
+ int retval = allMyChildren.checkChildren();
+ if ( verbosity > 0 )
+ std::cerr << "failoverSoak: checkChildren: " << retval << endl;
+ return retval ? ERROR_ON_CHILD : HUNKY_DORY;
+ }
+
+ // Even if some are still running, if there's an error, quit.
+ if ( allMyChildren.checkChildren() )
+ {
+ if ( verbosity > 0 )
+ cout << "failoverSoak: error on child.\n";
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers );
+ return ERROR_ON_CHILD;
+ }
+
+ // If one is hanging, quit.
+ if ( allMyChildren.hanging ( 120 ) )
+ {
+ if ( verbosity > 0 )
+ cout << "failoverSoak: child hanging.\n";
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers );
+ return HANGING;
+ }
+
+ if ( verbosity > 0 ) {
+ std::cerr << "------- next kill-broker loop --------\n";
+ allMyChildren.print();
+ }
+ }
+
+ retval = allMyChildren.checkChildren();
+ if ( verbosity > 0 )
+ std::cerr << "failoverSoak: checkChildren: " << retval << endl;
+
+ if ( verbosity > 0 )
+ cout << "failoverSoak: maxBrokers reached.\n";
+
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers );
+
+ return retval ? ERROR_ON_CHILD : HUNKY_DORY;
+}
+
+
+
diff --git a/cpp/src/tests/replaying_sender.cpp b/cpp/src/tests/replaying_sender.cpp
new file mode 100644
index 0000000000..7e148e277f
--- /dev/null
+++ b/cpp/src/tests/replaying_sender.cpp
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/MessageReplayTracker.h>
+#include <qpid/Exception.h>
+
+#include <iostream>
+#include <sstream>
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+class Sender : public FailoverManager::Command
+{
+ public:
+ Sender(const std::string& queue, uint count, uint reportFreq);
+ void execute(AsyncSession& session, bool isRetry);
+ uint getSent();
+
+ int verbosity;
+
+ private:
+ MessageReplayTracker sender;
+ const uint count;
+ uint sent;
+ const uint reportFrequency;
+ Message message;
+
+};
+
+Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq)
+{
+ message.getDeliveryProperties().setRoutingKey(queue);
+}
+
+void Sender::execute(AsyncSession& session, bool isRetry)
+{
+ if (isRetry) sender.replay(session);
+ else sender.init(session);
+ while (sent < count) {
+ stringstream message_data;
+ message_data << ++sent;
+ message.setData(message_data.str());
+ message.getHeaders().setInt("sn", sent);
+ sender.send(message);
+ if (count > reportFrequency && !(sent % reportFrequency)) {
+ if ( verbosity > 0 )
+ std::cout << "sent " << sent << " of " << count << std::endl;
+ }
+ }
+ message.setData("That's all, folks!");
+ sender.send(message);
+
+ if ( verbosity > 0 )
+ std::cout << "SENDER COMPLETED\n";
+}
+
+uint Sender::getSent()
+{
+ return sent;
+}
+
+int main(int argc, char ** argv)
+{
+ ConnectionSettings settings;
+
+ if ( argc != 6 )
+ {
+ std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity\n";
+ return 1;
+ }
+
+ settings.host = argv[1];
+ settings.port = atoi(argv[2]);
+ int n_messages = atoi(argv[3]);
+ int reportFrequency = atoi(argv[4]);
+ int verbosity = atoi(argv[5]);
+
+ FailoverManager connection(settings);
+ Sender sender("message_queue", n_messages, reportFrequency );
+ sender.verbosity = verbosity;
+ try {
+ connection.execute ( sender );
+ if ( verbosity > 0 )
+ {
+ std::cout << "Sender finished. Sent "
+ << sender.getSent()
+ << " messages."
+ << endl;
+ }
+ connection.close();
+ return 0;
+ }
+ catch(const std::exception& error)
+ {
+ cerr << "Sender (host: "
+ << settings.host
+ << " port: "
+ << settings.port
+ << " ) "
+ << " Failed: "
+ << error.what()
+ << std::endl;
+ }
+ return 1;
+}
diff --git a/cpp/src/tests/resuming_receiver.cpp b/cpp/src/tests/resuming_receiver.cpp
new file mode 100644
index 0000000000..f49a115e1e
--- /dev/null
+++ b/cpp/src/tests/resuming_receiver.cpp
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+class Listener : public MessageListener,
+ public FailoverManager::Command,
+ public FailoverManager::ReconnectionStrategy
+{
+ public:
+ Listener ( int report_frequency = 1000, int verbosity = 0 );
+ void received(Message& message);
+ void execute(AsyncSession& session, bool isRetry);
+ void check();
+ void editUrlList(std::vector<Url>& urls);
+ private:
+ Subscription subscription;
+ uint count;
+ uint received_twice;
+ uint lastSn;
+ bool gaps;
+ uint reportFrequency;
+ int verbosity;
+};
+
+
+Listener::Listener(int freq, int verbosity)
+ : count(0),
+ received_twice(0),
+ lastSn(0),
+ gaps(false),
+ reportFrequency(freq),
+ verbosity(verbosity)
+{}
+
+
+void Listener::received(Message & message)
+{
+ if (message.getData() == "That's all, folks!")
+ {
+ if(verbosity > 0 )
+ {
+ std::cout << "Shutting down listener for "
+ << message.getDestination() << std::endl;
+
+ std::cout << "Listener received "
+ << count
+ << " messages ("
+ << received_twice
+ << " received_twice)"
+ << endl;
+ }
+ subscription.cancel();
+ if ( verbosity > 0 )
+ std::cout << "LISTENER COMPLETED\n";
+ } else {
+ uint sn = message.getHeaders().getAsInt("sn");
+ if (lastSn < sn) {
+ if (sn - lastSn > 1) {
+ std::cerr << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl;
+ gaps = true;
+ }
+ lastSn = sn;
+ ++count;
+ if ( ! ( count % reportFrequency ) ) {
+ if ( verbosity > 0 )
+ std::cout << "Listener has received "
+ << count
+ << " messages.\n";
+ }
+ } else {
+ ++received_twice;
+ }
+ }
+}
+
+void Listener::check()
+{
+ if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost.");
+}
+
+void Listener::execute(AsyncSession& session, bool isRetry)
+{
+ if (isRetry) {
+ // std::cout << "Resuming from " << count << std::endl;
+ }
+ SubscriptionManager subs(session);
+ subscription = subs.subscribe(*this, "message_queue");
+ subs.run();
+}
+
+void Listener::editUrlList(std::vector<Url>& urls)
+{
+ /**
+ * A more realistic algorithm would be to search through the list
+ * for prefered hosts and ensure they come first in the list.
+ */
+ if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end());
+}
+
+int main(int argc, char ** argv)
+{
+ ConnectionSettings settings;
+
+ if ( argc != 5 )
+ {
+ std::cerr << "Usage: resuming_receiver host port report_frequency verbosity\n";
+ return 1;
+ }
+
+ settings.host = argv[1];
+ settings.port = atoi(argv[2]);
+ int reportFrequency = atoi(argv[3]);
+ int verbosity = atoi(argv[4]);
+
+ Listener listener(reportFrequency, verbosity);
+ FailoverManager connection(settings, &listener);
+
+ try {
+ connection.execute(listener);
+ connection.close();
+ listener.check();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cerr << "Receiver failed: " << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
+
diff --git a/cpp/src/tests/run_failover_soak b/cpp/src/tests/run_failover_soak
new file mode 100755
index 0000000000..d332cd42b9
--- /dev/null
+++ b/cpp/src/tests/run_failover_soak
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+host=127.0.0.1
+
+src_root=..
+module_dir=$src_root/.libs
+n_messages=300000
+report_frequency=10000
+verbosity=1
+
+
+exec `dirname $0`/failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $n_messages $report_frequency $verbosity
+