summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-17 16:45:24 +0000
committerAlan Conway <aconway@apache.org>2008-10-17 16:45:24 +0000
commit7db0c0970eac260626263314c30f0e20d4ef6c21 (patch)
tree231024436b5b7185f63972d90318acce97816c22 /cpp
parenta039e57108ed06586e73a255dc824ed27fc6de2a (diff)
downloadqpid-python-7db0c0970eac260626263314c30f0e20d4ef6c21.tar.gz
QPID-1367 Mick Goulish: improvements to client-side failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/examples/direct/direct_producer.cpp6
-rw-r--r--cpp/examples/failover/direct_producer.cpp33
-rw-r--r--cpp/examples/failover/listener.cpp176
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp8
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp1
-rw-r--r--cpp/src/qpid/client/FailoverConnection.cpp24
-rw-r--r--cpp/src/qpid/client/FailoverConnection.h4
-rw-r--r--cpp/src/qpid/client/FailoverSession.cpp28
-rw-r--r--cpp/src/qpid/client/FailoverSession.h4
-rw-r--r--cpp/src/qpid/client/FailoverSubscriptionManager.cpp1
-rw-r--r--cpp/src/qpid/client/FailoverSubscriptionManager.h3
-rwxr-xr-xcpp/src/tests/ssl_test16
12 files changed, 102 insertions, 202 deletions
diff --git a/cpp/examples/direct/direct_producer.cpp b/cpp/examples/direct/direct_producer.cpp
index 40fc644bf3..baa8d9092b 100644
--- a/cpp/examples/direct/direct_producer.cpp
+++ b/cpp/examples/direct/direct_producer.cpp
@@ -64,6 +64,7 @@ using std::string;
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ int count = argc>3 ? atoi(argv[3]) : 10;
Connection connection;
Message message;
try {
@@ -81,14 +82,15 @@ int main(int argc, char** argv) {
// Now send some messages ...
- for (int i=0; i<10; i++) {
+ for (int i=0; i<count; i++) {
stringstream message_data;
message_data << "Message " << i;
message.setData(message_data.str());
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
- async(session).messageTransfer(arg::content=message, arg::destination="amq.direct");
+ // async(session).messageTransfer(arg::content=message, arg::destination="amq.direct");
+ session.messageTransfer(arg::content=message, arg::destination="amq.direct");
}
// And send a final message to indicate termination.
diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp
index 1bee56e164..513971197e 100644
--- a/cpp/examples/failover/direct_producer.cpp
+++ b/cpp/examples/failover/direct_producer.cpp
@@ -36,12 +36,13 @@ using namespace std;
int
main ( int argc, char ** argv)
{
+
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
int count = argc>3 ? atoi(argv[3]) : 30;
- int delayMs = argc>4 ? atoi(argv[4]) : 1000;
string program_name = "PRODUCER";
+
try {
FailoverConnection connection;
FailoverSession * session;
@@ -49,14 +50,23 @@ main ( int argc, char ** argv)
connection.open ( host, port );
session = connection.newSession();
+ bool report = true;
int sent = 0;
while ( sent < count ) {
+
message.getDeliveryProperties().setRoutingKey("routing_key");
- std::cout << "sending message "
- << sent
- << " of "
- << count
- << ".\n";
+
+
+ if ( count > 1000 )
+ report = !(sent % 1000);
+
+ if ( report )
+ {
+ std::cout << "sending message "
+ << sent
+ << ".\n";
+ }
+
stringstream message_data;
message_data << sent;
message.setData(message_data.str());
@@ -70,12 +80,12 @@ main ( int argc, char ** argv)
0,
message
);
- usleep ( 1000*delayMs );
+
++ sent;
}
message.setData ( "That's all, folks!" );
- /* MICK FIXME
+ /* FIXME mgoulish 16 Oct 08
session.messageTransfer ( arg::content=message,
arg::destination="amq.direct"
);
@@ -88,10 +98,17 @@ main ( int argc, char ** argv)
session->sync();
connection.close();
+ std::cout << program_name
+ << " sent "
+ << sent
+ << " messages.\n";
+
std::cout << program_name << ": " << " completed without error." << std::endl;
return 0;
} catch(const std::exception& error) {
std::cout << program_name << ": " << error.what() << std::endl;
+ std::cout << program_name << "Exiting.\n";
+ return 1;
}
return 1;
}
diff --git a/cpp/examples/failover/listener.cpp b/cpp/examples/failover/listener.cpp
index 1c47127389..d8cb78c9ce 100644
--- a/cpp/examples/failover/listener.cpp
+++ b/cpp/examples/failover/listener.cpp
@@ -34,150 +34,26 @@ using namespace qpid::framing;
using namespace std;
-struct Recorder
-{
- unsigned int max_messages;
- unsigned int * messages_received;
-
- Recorder ( )
- {
- max_messages = 1000;
- messages_received = new unsigned int [ max_messages ];
- memset ( messages_received, 0, max_messages * sizeof(int) );
- }
-
-
- void
- received ( int i )
- {
- messages_received[i] ++;
- }
-
-
-
- void
- report ( )
- {
- int i;
-
- int last_received_message = 0;
-
- vector<unsigned int> missed_messages,
- multiple_messages;
-
- /*----------------------------------------------------
- Collect indices of missed and multiple messages.
- ----------------------------------------------------*/
- bool seen_first_message = false;
- for ( i = max_messages - 1; i >= 0; -- i )
- {
- if ( ! seen_first_message )
- {
- if ( messages_received [i] > 0 )
- {
- seen_first_message = true;
- last_received_message = i;
- }
- }
- else
- {
- if ( messages_received [i] == 0 )
- missed_messages.push_back ( i );
- else
- if ( messages_received [i] > 1 )
- {
- multiple_messages.push_back ( i );
- }
- }
- }
-
- /*--------------------------------------------
- Report missed messages.
- --------------------------------------------*/
- char const * verb = ( missed_messages.size() == 1 )
- ? " was "
- : " were ";
-
- char const * plural = ( missed_messages.size() == 1 )
- ? "."
- : "s.";
-
- std::cerr << "Listener::shutdown: There"
- << verb
- << missed_messages.size()
- << " missed message"
- << plural
- << endl;
-
- for ( i = 0; i < int(missed_messages.size()); ++ i )
- {
- std::cerr << " " << i << " was missed.\n";
- }
-
-
- /*--------------------------------------------
- Report multiple messages.
- --------------------------------------------*/
- verb = ( multiple_messages.size() == 1 )
- ? " was "
- : " were ";
-
- plural = ( multiple_messages.size() == 1 )
- ? "."
- : "s.";
-
- std::cerr << "Listener::shutdown: There"
- << verb
- << multiple_messages.size()
- << " multiple message"
- << plural
- << endl;
-
- for ( i = 0; i < int(multiple_messages.size()); ++ i )
- {
- std::cerr << " "
- << multiple_messages[i]
- << " was received "
- << messages_received [ multiple_messages[i] ]
- << " times.\n";
- }
-
- /*
- for ( i = 0; i < last_received_message; ++ i )
- {
- std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
- }
- */
- }
-
-};
-
-
-
-
struct Listener : public MessageListener
{
FailoverSubscriptionManager & subscriptionManager;
- Recorder & recorder;
+ Listener ( FailoverSubscriptionManager& subs );
- Listener ( FailoverSubscriptionManager& subs,
- Recorder & recorder
- );
-
- void shutdown() { recorder.report(); }
- void parse_message ( std::string const & msg );
+ void shutdown() { subscriptionManager.stop(); }
virtual void received ( Message & message );
+
+ int count;
};
-Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) :
+Listener::Listener ( FailoverSubscriptionManager & s ) :
subscriptionManager(s),
- recorder(r)
+ count(0)
{
}
@@ -188,18 +64,19 @@ Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) :
void
Listener::received ( Message & message )
{
- std::cerr << "Listener received: " << message.getData() << std::endl;
+ if(! (count%1000))
+ std::cerr << "\t\tListener received: " << message.getData() << std::endl;
+
+ ++ count;
+
if (message.getData() == "That's all, folks!")
{
std::cout << "Shutting down listener for " << message.getDestination()
<< std::endl;
- subscriptionManager.cancel(message.getDestination());
- shutdown();
- }
- else
- {
- parse_message ( message.getData() );
+ std::cout << "Listener received " << count << " messages.\n";
+ subscriptionManager.cancel(message.getDestination());
+ shutdown ( );
}
}
@@ -207,21 +84,6 @@ Listener::received ( Message & message )
-void
-Listener::parse_message ( const std::string & msg )
-{
- int msg_number;
- if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) )
- {
- std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
- return;
- }
- recorder.received ( msg_number );
-}
-
-
-
-
int
@@ -235,17 +97,12 @@ main ( int argc, char ** argv )
FailoverConnection connection;
FailoverSession * session;
- Recorder recorder;
-
- connection.name = program_name;
connection.open ( host, port );
session = connection.newSession();
- session->name = program_name;
FailoverSubscriptionManager subscriptions ( session );
- subscriptions.name = program_name;
- Listener listener ( subscriptions, recorder );
+ Listener listener ( subscriptions );
subscriptions.subscribe ( listener, "message_queue" );
subscriptions.run ( );
@@ -256,7 +113,8 @@ main ( int argc, char ** argv )
} catch(const std::exception& error) {
std::cout << program_name << ": " << error.what() << std::endl;
}
- return 1;
+
+ return 0;
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 652d59f448..92cf756580 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -151,8 +151,11 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) {
static const std::string CONN_CLOSED("Connection closed by broker");
void ConnectionImpl::shutdown() {
-
Mutex::ScopedLock l(lock);
+
+ if ( failureCallback )
+ failureCallback();
+
if (handler.isClosed()) return;
// FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
@@ -161,9 +164,6 @@ void ConnectionImpl::shutdown() {
closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
handler.fail(CONN_CLOSED);
-
- if ( failureCallback )
- failureCallback();
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index af805a3808..c26dba188d 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -42,6 +42,7 @@ Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
void Subscriber::received(Message& msg)
{
+
if (listener) {
listener->received(msg);
autoAck.ack(msg, session);
diff --git a/cpp/src/qpid/client/FailoverConnection.cpp b/cpp/src/qpid/client/FailoverConnection.cpp
index e98de868de..33b06a6a1a 100644
--- a/cpp/src/qpid/client/FailoverConnection.cpp
+++ b/cpp/src/qpid/client/FailoverConnection.cpp
@@ -36,7 +36,6 @@ namespace client {
FailoverConnection::FailoverConnection ( ) :
- name(),
failoverCompleteTime(0)
{
connection.registerFailureCallback
@@ -59,7 +58,6 @@ FailoverConnection::open ( const std::string& host,
settings.host = host;
settings.port = port;
settings.username = uid;
- settings.username = uid;
settings.password = pwd;
settings.virtualhost = virtualhost;
settings.maxFrameSize = maxFrameSize;
@@ -124,9 +122,19 @@ FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ )
void
FailoverConnection::failover ( )
{
+ std::vector<FailoverSession *>::iterator sessions_iterator;
+
+ for ( sessions_iterator = sessions.begin();
+ sessions_iterator != sessions.end();
+ ++ sessions_iterator )
+ {
+ FailoverSession * fs = * sessions_iterator;
+ fs->failover_in_progress = true;
+ }
+
std::vector<Url> knownBrokers = connection.getKnownBrokers();
if (knownBrokers.empty())
- throw Exception(QPID_MSG("FailoverConnection::failover " << name << " no known brokers."));
+ throw Exception(QPID_MSG("FailoverConnection::failover no known brokers."));
Connection newConnection;
for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) {
@@ -148,7 +156,6 @@ FailoverConnection::failover ( )
*/
// FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession
- std::vector<FailoverSession *>::iterator sessions_iterator;
for ( sessions_iterator = sessions.begin();
sessions_iterator < sessions.end();
++ sessions_iterator
@@ -173,6 +180,15 @@ FailoverConnection::failover ( )
FailoverSession * fs = * sessions_iterator;
fs->failover ( );
}
+
+ for ( sessions_iterator = sessions.begin();
+ sessions_iterator < sessions.end();
+ ++ sessions_iterator
+ )
+ {
+ FailoverSession * fs = * sessions_iterator;
+ fs->failover_in_progress = false;
+ }
}
diff --git a/cpp/src/qpid/client/FailoverConnection.h b/cpp/src/qpid/client/FailoverConnection.h
index 09e9c8bfa4..a84f0c2189 100644
--- a/cpp/src/qpid/client/FailoverConnection.h
+++ b/cpp/src/qpid/client/FailoverConnection.h
@@ -71,10 +71,6 @@ class FailoverConnection
void registerFailureCallback ( boost::function<void ()> fn );
- // If you have more than 1 connection and you want to give them
- // separate names for debugging...
- std::string name;
-
void failover ( );
struct timeval * failoverCompleteTime;
diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp
index a088a8c91b..25867c2a24 100644
--- a/cpp/src/qpid/client/FailoverSession.cpp
+++ b/cpp/src/qpid/client/FailoverSession.cpp
@@ -38,7 +38,7 @@ namespace qpid {
namespace client {
FailoverSession::FailoverSession ( ) :
- name("no_name")
+ failover_in_progress(false)
{
// The session is created by FailoverConnection::newSession
failoverSubscriptionManager = 0;
@@ -170,11 +170,26 @@ FailoverSession::messageTransfer ( const string& destination,
)
{
- session.messageTransfer ( destination,
- acceptMode,
- acquireMode,
- content
- );
+ while ( 1 )
+ {
+ try
+ {
+ session.messageTransfer ( destination,
+ acceptMode,
+ acquireMode,
+ content
+ );
+ break;
+ }
+ catch ( ... )
+ {
+ // Take special action only if there is a failover in progress.
+ if ( ! failover_in_progress )
+ break;
+
+ usleep ( 1000 );
+ }
+ }
}
@@ -583,7 +598,6 @@ FailoverSession::prepareForFailover ( Connection newConnection )
if ( failoverSubscriptionManager )
{
- //
failoverSubscriptionManager->prepareForFailover ( newSession );
}
}
diff --git a/cpp/src/qpid/client/FailoverSession.h b/cpp/src/qpid/client/FailoverSession.h
index 7ff26f8079..b301353968 100644
--- a/cpp/src/qpid/client/FailoverSession.h
+++ b/cpp/src/qpid/client/FailoverSession.h
@@ -59,8 +59,6 @@ class FailoverSession
FailoverSession ( );
~FailoverSession ( );
- std::string name;
-
framing::FrameSet::shared_ptr get();
SessionId getId() const;
@@ -82,6 +80,8 @@ class FailoverSession
void sendCompletion ( );
+ bool failover_in_progress;
+
// Wrapped functions from Session ----------------------------
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index c1ef7d00c4..d12d976ef5 100644
--- a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -33,7 +33,6 @@ namespace client {
FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
- name("no_name"),
newSessionIsValid(false),
no_failover(false)
{
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.h b/cpp/src/qpid/client/FailoverSubscriptionManager.h
index 651e2549c2..b7631d3a98 100644
--- a/cpp/src/qpid/client/FailoverSubscriptionManager.h
+++ b/cpp/src/qpid/client/FailoverSubscriptionManager.h
@@ -114,9 +114,6 @@ class FailoverSubscriptionManager
void prepareForFailover ( Session newSession );
void failover ( );
- std::string name;
-
-
private:
sys::Monitor lock;
diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test
index ed06f19fd3..a357a38c0a 100755
--- a/cpp/src/tests/ssl_test
+++ b/cpp/src/tests/ssl_test
@@ -1,8 +1,8 @@
#!/bin/sh
# Run a simple test over SSL
-MY_DIR=$(dirname $(which $0))
-CERT_DIR=${MY_DIR}/test_cert_db
-CERT_PW_FILE=${MY_DIR}/cert.password
+
+CERT_DIR=`pwd`/test_cert_db
+CERT_PW_FILE=`pwd`/cert.password
HOSTNAME=`hostname`
COUNT=10000
@@ -18,13 +18,13 @@ create_certs() {
}
start_broker() {
- ${MY_DIR}/../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no\
- --load-module ${MY_DIR}/../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
+ ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no\
+ --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
PORT=`cat qpidd.port`
}
stop_broker() {
- ${MY_DIR}/../qpidd -q --port $PORT
+ ../qpidd -q --port $PORT
}
if [[ !(-e ${CERT_PW_FILE}) ]] ; then
@@ -36,8 +36,8 @@ fi
start_broker || error "Could not start broker"
echo "Running SSL test on port $PORT"
-export QPID_LOAD_MODULE=${MY_DIR}/../.libs/sslconnector.so
+export QPID_LOAD_MODULE=../.libs/sslconnector.so
export QPID_SSL_CERT_DB=${CERT_DIR}
export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE}
-${MY_DIR}/perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME --summary
+./perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME --summary