diff options
Diffstat (limited to 'qpid/cpp/src')
112 files changed, 1971 insertions, 1564 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 3ac1992e2f..457463e72d 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -343,7 +343,6 @@ libqpidcommon_la_SOURCES = \ qpid/sys/AsynchIOHandler.cpp \ qpid/sys/Dispatcher.cpp \ qpid/sys/DispatchHandle.cpp \ - qpid/sys/LatencyMetric.cpp \ qpid/sys/Runnable.cpp \ qpid/sys/Shlib.cpp \ qpid/sys/Timer.cpp @@ -437,21 +436,26 @@ libqpidclient_la_SOURCES = \ qpid/client/Bounds.cpp \ qpid/client/Connection.cpp \ qpid/client/ConnectionHandler.cpp \ - qpid/client/ConnectionImpl.cpp \ + qpid/client/ConnectionImpl.cpp \ qpid/client/ConnectionSettings.cpp \ - qpid/client/Connector.cpp \ + qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ qpid/client/Dispatcher.cpp \ - qpid/client/FailoverManager.cpp \ + qpid/client/FailoverManager.cpp \ qpid/client/FailoverListener.h \ qpid/client/FailoverListener.cpp \ qpid/client/Future.cpp \ qpid/client/FutureCompletion.cpp \ + qpid/client/Completion.cpp \ + qpid/client/CompletionImpl.h \ qpid/client/FutureResult.cpp \ qpid/client/HandlePrivate.h \ + qpid/client/PrivateImplPrivate.h \ qpid/client/LoadPlugins.cpp \ qpid/client/LocalQueue.cpp \ qpid/client/Message.cpp \ + qpid/client/MessageImpl.cpp \ + qpid/client/MessageImpl.h \ qpid/client/MessageListener.cpp \ qpid/client/MessageReplayTracker.cpp \ qpid/client/QueueOptions.cpp \ @@ -597,6 +601,7 @@ nobase_include_HEADERS = \ qpid/client/FutureCompletion.h \ qpid/client/FutureResult.h \ qpid/client/Handle.h \ + qpid/client/PrivateImpl.h \ qpid/client/LocalQueue.h \ qpid/client/QueueOptions.h \ qpid/client/Message.h \ @@ -697,7 +702,6 @@ nobase_include_HEADERS = \ qpid/sys/FileSysDir.h \ qpid/sys/IntegerTypes.h \ qpid/sys/IOHandle.h \ - qpid/sys/LatencyMetric.h \ qpid/sys/LockFile.h \ qpid/sys/LockPtr.h \ qpid/sys/Monitor.h \ diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index e2054d75e9..fdac229646 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -56,6 +56,8 @@ cluster_la_SOURCES = \ qpid/cluster/Dispatchable.h \ qpid/cluster/UpdateClient.cpp \ qpid/cluster/UpdateClient.h \ + qpid/cluster/ErrorCheck.cpp \ + qpid/cluster/ErrorCheck.h \ qpid/cluster/Event.cpp \ qpid/cluster/Event.h \ qpid/cluster/EventFrame.h \ diff --git a/qpid/cpp/src/common.vcproj b/qpid/cpp/src/common.vcproj index 96d67b9d54..a95e8483e2 100644 --- a/qpid/cpp/src/common.vcproj +++ b/qpid/cpp/src/common.vcproj @@ -456,6 +456,9 @@ RelativePath="gen\qpid\framing\ClusterConnectionTxStartBody.cpp">
</File>
<File
+ RelativePath="gen\qpid\framing\ClusterErrorCheckBody.cpp">
+ </File>
+ <File
RelativePath="gen\qpid\framing\ClusterMessageExpiredBody.cpp">
</File>
<File
@@ -972,9 +975,6 @@ RelativePath="qpid\sys\Dispatcher.cpp">
</File>
<File
- RelativePath="qpid\sys\LatencyMetric.cpp">
- </File>
- <File
RelativePath="qpid\sys\Runnable.cpp">
</File>
<File
@@ -1165,6 +1165,9 @@ RelativePath="gen\qpid\framing\ClusterConnectionTxStartBody.h">
</File>
<File
+ RelativePath="gen\qpid\framing\ClusterErrorCheckBody.h">
+ </File>
+ <File
RelativePath="gen\qpid\framing\ClusterMessageExpiredBody.h">
</File>
<File
@@ -1810,9 +1813,6 @@ RelativePath="qpid\sys\IOHandle.h">
</File>
<File
- RelativePath="qpid\sys\LatencyMetric.h">
- </File>
- <File
RelativePath="qpid\sys\LockFile.h">
</File>
<File
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index db957051d8..f927db09bb 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -88,6 +88,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const SessionException& e) { QPID_LOG(error, "Execution exception: " << e.what()); + executionException(e.code, e.what()); // Let subclass handle this first. framing::AMQP_AllProxy::Execution execution(channel); AMQMethodBody* m = f.getMethod(); SequenceNumber commandId; @@ -98,6 +99,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const ChannelException& e){ QPID_LOG(error, "Channel exception: " << e.what()); + channelException(e.code, e.what()); // Let subclass handle this first. peer.detached(name, e.code); } catch(const ConnectionException& e) { diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h index 0b158ec2b4..0d9c72ff02 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -87,8 +87,9 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, QPID_COMMON_EXTERN virtual void invoke(const framing::AMQMethodBody& m); virtual void setState(const std::string& sessionName, bool force) = 0; - virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0; virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0; + virtual void channelException(framing::session::DetachCode, const std::string& msg) = 0; + virtual void executionException(framing::execution::ErrorCode, const std::string& msg) = 0; virtual void detaching() = 0; // Notification of events diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index b06e06d353..365b3ccbeb 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -57,7 +57,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtObject(0), links(broker_.getLinks()), agent(0), - timer(broker_.getTimer()) + timer(broker_.getTimer()), + errorListener(0) { Manageable* parent = broker.GetVhostObject(); diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index b659fe6468..e67cdce681 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -66,6 +66,17 @@ class Connection : public sys::ConnectionInputHandler, public RefCounted { public: + /** + * Listener that can be registered with a Connection to be informed of errors. + */ + class ErrorListener + { + public: + virtual ~ErrorListener() {} + virtual void sessionError(uint16_t channel, const std::string&) = 0; + virtual void connectionError(const std::string&) = 0; + }; + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0); ~Connection (); @@ -101,6 +112,9 @@ class Connection : public sys::ConnectionInputHandler, const std::string& getMgmtId() const { return mgmtId; } management::ManagementAgent* getAgent() const { return agent; } void setFederationLink(bool b); + /** Connection does not delete the listener. 0 resets. */ + void setErrorListener(ErrorListener* l) { errorListener=l; } + ErrorListener* getErrorListener() { return errorListener; } void setHeartbeatInterval(uint16_t heartbeat); void sendHeartbeat(); @@ -112,6 +126,7 @@ class Connection : public sys::ConnectionInputHandler, void sendClose(); void setSecureConnection(SecureConnection* secured); + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; @@ -128,6 +143,8 @@ class Connection : public sys::ConnectionInputHandler, management::ManagementAgent* agent; Timer& timer; boost::intrusive_ptr<TimerTask> heartbeatTimer; + ErrorListener* errorListener; + public: qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } }; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 63212c7794..8b70836da0 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -64,13 +64,16 @@ void ConnectionHandler::heartbeat() void ConnectionHandler::handle(framing::AMQFrame& frame) { AMQMethodBody* method=frame.getBody()->getMethod(); + Connection::ErrorListener* errorListener = handler->connection.getErrorListener(); try{ if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) { handler->connection.getChannel(frame.getChannel()).in(frame); } }catch(ConnectionException& e){ + if (errorListener) errorListener->connectionError(e.what()); handler->proxy.close(e.code, e.what()); }catch(std::exception& e){ + if (errorListener) errorListener->connectionError(e.what()); handler->proxy.close(541/*internal error*/, e.what()); } } diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp index 907f1e56e1..ffe0cc437b 100644 --- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp @@ -33,4 +33,6 @@ bool ExpiryPolicy::hasExpired(Message& m) { return m.getExpiration() < sys::AbsTime::now(); } +void ExpiryPolicy::forget(Message&) {} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h index cefe9b7552..eeb3ffda21 100644 --- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h @@ -39,6 +39,7 @@ class ExpiryPolicy : public RefCounted QPID_BROKER_EXTERN virtual ~ExpiryPolicy(); QPID_BROKER_EXTERN virtual void willExpire(Message&); QPID_BROKER_EXTERN virtual bool hasExpired(Message&); + QPID_BROKER_EXTERN virtual void forget(Message&); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 40b5515829..1e9eb9d386 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -53,6 +53,8 @@ Message::Message(const framing::SequenceNumber& id) : Message::~Message() { + if (expiryPolicy) + expiryPolicy->forget(*this); } void Message::forcePersistent() @@ -334,7 +336,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; - if (expiryPolicy) + if (expiryPolicy) expiryPolicy->willExpire(*this); } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 442c3eb34b..ca1f875991 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -45,14 +45,20 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace -void SessionHandler::channelException(framing::session::DetachCode, const std::string&) { - handleDetach(); -} - void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) { + // NOTE: must tell the error listener _before_ calling connection.close() + if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg); connection.close(code, msg); } +void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) { + if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +} + +void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) { + if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +} + ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index ffc032f64c..ca6d6bb193 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -73,8 +73,9 @@ class SessionHandler : public amqp_0_10::SessionHandler { virtual void setState(const std::string& sessionName, bool force); virtual qpid::SessionState* getState(); virtual framing::FrameHandler* getInHandler(); - virtual void channelException(framing::session::DetachCode code, const std::string& msg); virtual void connectionException(framing::connection::CloseCode code, const std::string& msg); + virtual void channelException(framing::session::DetachCode, const std::string& msg); + virtual void executionException(framing::execution::ErrorCode, const std::string& msg); virtual void detaching(); virtual void readyToSend(); diff --git a/qpid/cpp/src/tests/BasicP2PTest.h b/qpid/cpp/src/qpid/client/Completion.cpp index b2611f0301..e3676b2bde 100644 --- a/qpid/cpp/src/tests/BasicP2PTest.h +++ b/qpid/cpp/src/qpid/client/Completion.cpp @@ -1,5 +1,3 @@ -#ifndef _BasicP2PTest_ -#define _BasicP2PTest_ /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -21,26 +19,20 @@ * */ -#include <memory> -#include <sstream> - -#include "qpid/Exception.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Message.h" -#include "qpid/client/Connection.h" -#include "qpid/client/MessageListener.h" -#include "SimpleTestCaseBase.h" - +#include "Completion.h" +#include "CompletionImpl.h" +#include "HandlePrivate.h" namespace qpid { +namespace client { -class BasicP2PTest : public SimpleTestCaseBase -{ - class Receiver; -public: - void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options); -}; +Completion::Completion(CompletionImpl* i) : Handle<CompletionImpl>(i) {} +Completion::~Completion() {} +Completion::Completion(const Completion& c) : Handle<CompletionImpl>(c.impl) {} +Completion& Completion::operator=(const Completion& c) { Handle<CompletionImpl>::operator=(c); return *this; } -} +void Completion::wait() { impl->wait(); } +bool Completion::isComplete() { return impl->isComplete(); } +std::string Completion::getResult() { return impl->getResult(); } -#endif +}} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Completion.h b/qpid/cpp/src/qpid/client/Completion.h index c4979d7934..0b246b7765 100644 --- a/qpid/cpp/src/qpid/client/Completion.h +++ b/qpid/cpp/src/qpid/client/Completion.h @@ -22,13 +22,14 @@ #ifndef _Completion_ #define _Completion_ -#include <boost/shared_ptr.hpp> -#include "Future.h" -#include "SessionImpl.h" +#include "Handle.h" +#include <string> namespace qpid { namespace client { +class CompletionImpl; + /** * Asynchronous commands that do not return a result will return a * Completion. You can use the completion to wait for that specific @@ -38,32 +39,26 @@ namespace client { * *\ingroup clientapi */ -class Completion +class Completion : public Handle<CompletionImpl> { -protected: - Future future; - shared_ptr<SessionImpl> session; - public: ///@internal - Completion() {} - - ///@internal - Completion(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {} + QPID_CLIENT_EXTERN Completion(CompletionImpl* =0); + QPID_CLIENT_EXTERN ~Completion(); + QPID_CLIENT_EXTERN Completion(const Completion&); + QPID_CLIENT_EXTERN Completion& operator=(const Completion&); /** Wait for the asynchronous command that returned this *Completion to complete. * - *@exception If the command returns an error, get() throws an exception. + *@exception If the command returns an error. */ - void wait() - { - future.wait(*session); - } + QPID_CLIENT_EXTERN void wait(); + + QPID_CLIENT_EXTERN bool isComplete(); - bool isComplete() { - return future.isComplete(*session); - } + protected: + QPID_CLIENT_EXTERN std::string getResult(); }; }} diff --git a/qpid/cpp/src/tests/BasicPubSubTest.h b/qpid/cpp/src/qpid/client/CompletionImpl.h index 242d2847d7..119abc093a 100644 --- a/qpid/cpp/src/tests/BasicPubSubTest.h +++ b/qpid/cpp/src/qpid/client/CompletionImpl.h @@ -1,5 +1,6 @@ -#ifndef _BasicPubSubTest_ -#define _BasicPubSubTest_ +#ifndef QPID_CLIENT_COMPLETIONIMPL_H +#define QPID_CLIENT_COMPLETIONIMPL_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -21,31 +22,29 @@ * */ -#include <memory> -#include <sstream> - -#include "qpid/Exception.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Message.h" -#include "qpid/client/Connection.h" -#include "qpid/client/MessageListener.h" -#include "SimpleTestCaseBase.h" -#include <boost/ptr_container/ptr_vector.hpp> -#include <boost/format.hpp> - +#include "qpid/RefCounted.h" +#include "Future.h" namespace qpid { +namespace client { -using namespace qpid::client; - -class BasicPubSubTest : public SimpleTestCaseBase +///@internal +class CompletionImpl : public RefCounted { - class Receiver; - class MultiReceiver; public: - void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options); + CompletionImpl() {} + CompletionImpl(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {} + + bool isComplete() { return future.isComplete(*session); } + void wait() { future.wait(*session); } + std::string getResult() { return future.getResult(*session); } + +protected: + Future future; + shared_ptr<SessionImpl> session; }; -} +}} // namespace qpid::client + -#endif +#endif /*!QPID_CLIENT_COMPLETIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index cc62d724cb..e8415403ca 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "Connection.h" +#include "ConnectionImpl.h" #include "ConnectionSettings.h" #include "Message.h" #include "SessionImpl.h" diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index 846ac33790..d898ea70d9 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -25,6 +25,7 @@ #include <string> #include "qpid/client/Session.h" #include "qpid/client/ClientImportExport.h" +#include "qpid/client/ConnectionSettings.h" namespace qpid { @@ -32,7 +33,6 @@ struct Url; namespace client { -struct ConnectionSettings; class ConnectionImpl; /** diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 745bdb63b5..b1e83025ab 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -111,9 +111,11 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) Mutex::ScopedLock l(lock); s = sessions[frame.getChannel()].lock(); } - if (!s) - throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel())); - s->in(frame); + if (!s) { + QPID_LOG(info, "Dropping frame received on invalid channel: " << frame); + } else { + s->in(frame); + } } bool ConnectionImpl::isOpen() const diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp index 8d8574520a..5156031748 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp @@ -26,6 +26,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/BlockingQueue.h" #include "Message.h" +#include "MessageImpl.h" #include <boost/state_saver.hpp> @@ -49,6 +50,9 @@ Dispatcher::Dispatcher(const Session& s, const std::string& q) session.getExecution().getDemux().get(q); } +Dispatcher::~Dispatcher() {} + + void Dispatcher::start() { worker = Thread(this); @@ -71,7 +75,7 @@ void Dispatcher::run() Mutex::ScopedUnlock u(lock); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message msg(*content); + Message msg(new MessageImpl(*content)); boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination()); if (!listener) { QPID_LOG(error, "No listener found for destination " << msg.getDestination()); diff --git a/qpid/cpp/src/qpid/client/Dispatcher.h b/qpid/cpp/src/qpid/client/Dispatcher.h index e84f8f303d..4dbb75dcf2 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.h +++ b/qpid/cpp/src/qpid/client/Dispatcher.h @@ -30,7 +30,6 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "MessageListener.h" -#include "SubscriptionImpl.h" namespace qpid { namespace client { @@ -61,6 +60,7 @@ class Dispatcher : public sys::Runnable public: Dispatcher(const Session& session, const std::string& queue = ""); + ~Dispatcher(); void start(); void wait(); diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp index 16370f8912..ed9354d528 100644 --- a/qpid/cpp/src/qpid/client/FailoverListener.cpp +++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp @@ -20,6 +20,9 @@ */ #include "FailoverListener.h" #include "SessionBase_0_10Access.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/SubscriptionImpl.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" diff --git a/qpid/cpp/src/qpid/client/FailoverListener.h b/qpid/cpp/src/qpid/client/FailoverListener.h index fe73a26611..7afee736ac 100644 --- a/qpid/cpp/src/qpid/client/FailoverListener.h +++ b/qpid/cpp/src/qpid/client/FailoverListener.h @@ -33,6 +33,7 @@ namespace qpid { namespace client { class SubscriptionManager; +class ConnectionImpl; /** * @internal Listen for failover updates from the amq.failover exchange. diff --git a/qpid/cpp/src/qpid/client/Future.cpp b/qpid/cpp/src/qpid/client/Future.cpp index 6a0c78ae4b..fda40219ba 100644 --- a/qpid/cpp/src/qpid/client/Future.cpp +++ b/qpid/cpp/src/qpid/client/Future.cpp @@ -20,6 +20,7 @@ */ #include "Future.h" +#include "SessionImpl.h" namespace qpid { namespace client { diff --git a/qpid/cpp/src/qpid/client/Future.h b/qpid/cpp/src/qpid/client/Future.h index ea01522fe8..28c9a2bbbd 100644 --- a/qpid/cpp/src/qpid/client/Future.h +++ b/qpid/cpp/src/qpid/client/Future.h @@ -26,17 +26,15 @@ #include <boost/shared_ptr.hpp> #include "qpid/Exception.h" #include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/StructHelper.h" #include "FutureCompletion.h" #include "FutureResult.h" -#include "SessionImpl.h" #include "ClientImportExport.h" namespace qpid { namespace client { /**@internal */ -class Future : private framing::StructHelper +class Future { framing::SequenceNumber command; boost::shared_ptr<FutureResult> result; @@ -46,13 +44,9 @@ public: Future() : complete(false) {} Future(const framing::SequenceNumber& id) : command(id), complete(false) {} - template <class T> void decodeResult(T& value, SessionImpl& session) - { - if (result) { - decode(value, result->getResult(session)); - } else { - throw Exception("Result not expected"); - } + std::string getResult(SessionImpl& session) { + if (result) return result->getResult(session); + else throw Exception("Result not expected"); } QPID_CLIENT_EXTERN void wait(SessionImpl& session); diff --git a/qpid/cpp/src/qpid/client/Handle.h b/qpid/cpp/src/qpid/client/Handle.h index d8b822d0f9..12fb4cf3c1 100644 --- a/qpid/cpp/src/qpid/client/Handle.h +++ b/qpid/cpp/src/qpid/client/Handle.h @@ -30,9 +30,11 @@ namespace client { template <class T> class HandlePrivate; /** - * A handle is like a pointer: it points to some underlying object. + * A handle is like a pointer: it points to some implementation object. + * Copying the handle does not copy the object. + * * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the - * implicit conversion to bool to test for a null handle. + * conversion to bool to test for a null handle. */ template <class T> class Handle { public: @@ -46,8 +48,11 @@ template <class T> class Handle { /**@return true if handle is null. It is an error to call any function on a null handle. */ QPID_CLIENT_EXTERN bool isNull() const { return !impl; } + /** Conversion to bool supports idiom if (handle) { handle->... } */ QPID_CLIENT_EXTERN operator bool() const { return impl; } - QPID_CLIENT_EXTERN bool operator !() const { return impl; } + + /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */ + QPID_CLIENT_EXTERN bool operator !() const { return !impl; } QPID_CLIENT_EXTERN void swap(Handle<T>&); diff --git a/qpid/cpp/src/qpid/client/HandlePrivate.h b/qpid/cpp/src/qpid/client/HandlePrivate.h index 488ce48075..46e4bff808 100644 --- a/qpid/cpp/src/qpid/client/HandlePrivate.h +++ b/qpid/cpp/src/qpid/client/HandlePrivate.h @@ -21,14 +21,16 @@ * under the License. * */ +#include "Handle.h" +#include "qpid/RefCounted.h" #include <algorithm> +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace client { /** @file - * Private implementation of handle, include in .cpp file of handle - * subclasses _after_ including the declaration of class T. + * Implementation of handle, include in .cpp file of handle subclasses. * T can be any class that can be used with boost::intrusive_ptr. */ @@ -52,9 +54,13 @@ void Handle<T>::swap(Handle<T>& h) { std::swap(impl, h.impl); } template <class T> class HandlePrivate { public: - static boost::intrusive_ptr<T> get(Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); } + static boost::intrusive_ptr<T> get(const Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); } + static void set(Handle<T>& h, const boost::intrusive_ptr<T>& p) { Handle<T>(p.get()).swap(h); } }; +template<class T> boost::intrusive_ptr<T> handleGetPtr(Handle<T>& h) { return HandlePrivate<T>::get(h); } +template<class T> boost::intrusive_ptr<const T> handleGetPtr(const Handle<T>& h) { return HandlePrivate<T>::get(h); } +template<class T> void handleSetPtr(Handle<T>& h, const boost::intrusive_ptr<T>& p) { HandlePrivate<T>::set(h, p); } }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/LocalQueue.cpp b/qpid/cpp/src/qpid/client/LocalQueue.cpp index e449c9f795..02fecf804f 100644 --- a/qpid/cpp/src/qpid/client/LocalQueue.cpp +++ b/qpid/cpp/src/qpid/client/LocalQueue.cpp @@ -19,11 +19,14 @@ * */ #include "LocalQueue.h" +#include "MessageImpl.h" #include "qpid/Exception.h" #include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "HandlePrivate.h" #include "SubscriptionImpl.h" +#include "CompletionImpl.h" namespace qpid { namespace client { @@ -49,7 +52,7 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) { bool ok = queue->pop(content, timeout); if (!ok) return false; if (content->isA<MessageTransferBody>()) { - result = Message(*content); + result = Message(new MessageImpl(*content)); boost::intrusive_ptr<SubscriptionImpl> si = HandlePrivate<SubscriptionImpl>::get(subscription); assert(si); if (si) si->received(result); diff --git a/qpid/cpp/src/qpid/client/Message.cpp b/qpid/cpp/src/qpid/client/Message.cpp index 13caaecefd..962ce26305 100644 --- a/qpid/cpp/src/qpid/client/Message.cpp +++ b/qpid/cpp/src/qpid/client/Message.cpp @@ -20,52 +20,37 @@ */ #include "Message.h" +#include "PrivateImplPrivate.h" +#include "MessageImpl.h" namespace qpid { namespace client { -Message::Message(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {} +template class PrivateImpl<MessageImpl>; -std::string Message::getDestination() const -{ - return method.getDestination(); -} +Message::Message(const std::string& data, const std::string& routingKey) : PrivateImpl<MessageImpl>(new MessageImpl(data, routingKey)) {} +Message::Message(MessageImpl* i) : PrivateImpl<MessageImpl>(i) {} +Message::~Message() {} -bool Message::isRedelivered() const -{ - return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); -} +std::string Message::getDestination() const { return impl->getDestination(); } +bool Message::isRedelivered() const { return impl->isRedelivered(); } +void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); } +framing::FieldTable& Message::getHeaders() { return impl->getHeaders(); } +const framing::FieldTable& Message::getHeaders() const { return impl->getHeaders(); } +const framing::SequenceNumber& Message::getId() const { return impl->getId(); } -void Message::setRedelivered(bool redelivered) -{ - getDeliveryProperties().setRedelivered(redelivered); -} +void Message::setData(const std::string& s) { impl->setData(s); } +const std::string& Message::getData() const { return impl->getData(); } +std::string& Message::getData() { return impl->getData(); } -framing::FieldTable& Message::getHeaders() -{ - return getMessageProperties().getApplicationHeaders(); -} +void Message::appendData(const std::string& s) { impl->appendData(s); } -const framing::FieldTable& Message::getHeaders() const -{ - return getMessageProperties().getApplicationHeaders(); -} +bool Message::hasMessageProperties() const { return impl->hasMessageProperties(); } +framing::MessageProperties& Message::getMessageProperties() { return impl->getMessageProperties(); } +const framing::MessageProperties& Message::getMessageProperties() const { return impl->getMessageProperties(); } -const framing::MessageTransferBody& Message::getMethod() const -{ - return method; -} - -const framing::SequenceNumber& Message::getId() const -{ - return id; -} - -/**@internal for incoming messages */ -Message::Message(const framing::FrameSet& frameset) : - method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) -{ - populate(frameset); -} +bool Message::hasDeliveryProperties() const { return impl->hasDeliveryProperties(); } +framing::DeliveryProperties& Message::getDeliveryProperties() { return impl->getDeliveryProperties(); } +const framing::DeliveryProperties& Message::getDeliveryProperties() const { return impl->getDeliveryProperties(); } }} diff --git a/qpid/cpp/src/qpid/client/Message.h b/qpid/cpp/src/qpid/client/Message.h index 235e20f97d..97238db647 100644 --- a/qpid/cpp/src/qpid/client/Message.h +++ b/qpid/cpp/src/qpid/client/Message.h @@ -1,5 +1,5 @@ -#ifndef _client_Message_h -#define _client_Message_h +#ifndef QPID_CLIENT_MESSAGE_H +#define QPID_CLIENT_MESSAGE_H /* * @@ -21,15 +21,24 @@ * under the License. * */ -#include <string> -#include "qpid/client/Session.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/TransferContent.h" + +#include "qpid/client/PrivateImpl.h" #include "qpid/client/ClientImportExport.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/DeliveryProperties.h" +#include <string> namespace qpid { + +namespace framing { +class FieldTable; +class SequenceNumber; // FIXME aconway 2009-04-17: remove with getID? +} + namespace client { +class MessageImpl; + /** * A message sent to or received from the broker. * @@ -104,8 +113,7 @@ namespace client { * * */ - -class Message : public framing::TransferContent +class Message : public PrivateImpl<MessageImpl> { public: /** Create a Message. @@ -115,6 +123,23 @@ public: QPID_CLIENT_EXTERN Message(const std::string& data=std::string(), const std::string& routingKey=std::string()); + QPID_CLIENT_EXTERN ~Message(); + + QPID_CLIENT_EXTERN void setData(const std::string&); + QPID_CLIENT_EXTERN const std::string& getData() const; + QPID_CLIENT_EXTERN std::string& getData(); + + QPID_CLIENT_EXTERN void appendData(const std::string&); + + QPID_CLIENT_EXTERN bool hasMessageProperties() const; + QPID_CLIENT_EXTERN framing::MessageProperties& getMessageProperties(); + QPID_CLIENT_EXTERN const framing::MessageProperties& getMessageProperties() const; + + QPID_CLIENT_EXTERN bool hasDeliveryProperties() const; + QPID_CLIENT_EXTERN framing::DeliveryProperties& getDeliveryProperties(); + QPID_CLIENT_EXTERN const framing::DeliveryProperties& getDeliveryProperties() const; + + /** The destination of messages sent to the broker is the exchange * name. The destination of messages received from the broker is * the delivery tag identifyig the local subscription (often this @@ -133,20 +158,14 @@ public: /** Get a non-modifyable reference to the message headers. */ QPID_CLIENT_EXTERN const framing::FieldTable& getHeaders() const; - ///@internal - QPID_CLIENT_EXTERN const framing::MessageTransferBody& getMethod() const; + // FIXME aconway 2009-04-17: does this need to be in public API? ///@internal QPID_CLIENT_EXTERN const framing::SequenceNumber& getId() const; - /**@internal for incoming messages */ - QPID_CLIENT_EXTERN Message(const framing::FrameSet& frameset); - -private: - //method and id are only set for received messages: - framing::MessageTransferBody method; - framing::SequenceNumber id; + ///@internal + Message(MessageImpl*); }; }} -#endif /*!_client_Message_h*/ +#endif /*!QPID_CLIENT_MESSAGE_H*/ diff --git a/qpid/cpp/src/qpid/client/MessageImpl.cpp b/qpid/cpp/src/qpid/client/MessageImpl.cpp new file mode 100644 index 0000000000..3d06fd1d8d --- /dev/null +++ b/qpid/cpp/src/qpid/client/MessageImpl.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageImpl.h" + +namespace qpid { +namespace client { + +MessageImpl::MessageImpl(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {} + +std::string MessageImpl::getDestination() const +{ + return method.getDestination(); +} + +bool MessageImpl::isRedelivered() const +{ + return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); +} + +void MessageImpl::setRedelivered(bool redelivered) +{ + getDeliveryProperties().setRedelivered(redelivered); +} + +framing::FieldTable& MessageImpl::getHeaders() +{ + return getMessageProperties().getApplicationHeaders(); +} + +const framing::FieldTable& MessageImpl::getHeaders() const +{ + return getMessageProperties().getApplicationHeaders(); +} + +const framing::MessageTransferBody& MessageImpl::getMethod() const +{ + return method; +} + +const framing::SequenceNumber& MessageImpl::getId() const +{ + return id; +} + +/**@internal for incoming messages */ +MessageImpl::MessageImpl(const framing::FrameSet& frameset) : + method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) +{ + populate(frameset); +} + +}} diff --git a/qpid/cpp/src/qpid/client/MessageImpl.h b/qpid/cpp/src/qpid/client/MessageImpl.h new file mode 100644 index 0000000000..c06d9b5afc --- /dev/null +++ b/qpid/cpp/src/qpid/client/MessageImpl.h @@ -0,0 +1,76 @@ +#ifndef QPID_CLIENT_MESSAGEIMPL_H +#define QPID_CLIENT_MESSAGEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/client/Session.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/TransferContent.h" + +namespace qpid { +namespace client { + +class MessageImpl : public framing::TransferContent +{ +public: + /** Create a Message. + *@param data Data for the message body. + *@param routingKey Passed to the exchange that routes the message. + */ + MessageImpl(const std::string& data=std::string(), + const std::string& routingKey=std::string()); + + /** The destination of messages sent to the broker is the exchange + * name. The destination of messages received from the broker is + * the delivery tag identifyig the local subscription (often this + * is the name of the subscribed queue.) + */ + std::string getDestination() const; + + /** Check the redelivered flag. */ + bool isRedelivered() const; + /** Set the redelivered flag. */ + void setRedelivered(bool redelivered); + + /** Get a modifyable reference to the message headers. */ + framing::FieldTable& getHeaders(); + + /** Get a non-modifyable reference to the message headers. */ + const framing::FieldTable& getHeaders() const; + + ///@internal + const framing::MessageTransferBody& getMethod() const; + ///@internal + const framing::SequenceNumber& getId() const; + + /**@internal for incoming messages */ + MessageImpl(const framing::FrameSet& frameset); + +private: + //method and id are only set for received messages: + framing::MessageTransferBody method; + framing::SequenceNumber id; +}; + +}} + +#endif /*!QPID_CLIENT_MESSAGEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/PrivateImpl.h b/qpid/cpp/src/qpid/client/PrivateImpl.h new file mode 100644 index 0000000000..6e5ea35ce0 --- /dev/null +++ b/qpid/cpp/src/qpid/client/PrivateImpl.h @@ -0,0 +1,54 @@ +#ifndef QPID_CLIENT_PRIVATEIMPL_H +#define QPID_CLIENT_PRIVATEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/ClientImportExport.h" + +namespace qpid { +namespace client { + +template <class T> class PrivateImplPrivate; + +/** + * Base classes for objects with a private implementation. + * + * PrivateImpl objects have value semantics: copying the object also + * makes a copy of the implementation. + */ +template <class T> class PrivateImpl { + public: + QPID_CLIENT_EXTERN ~PrivateImpl(); + QPID_CLIENT_EXTERN PrivateImpl(const PrivateImpl&); + QPID_CLIENT_EXTERN PrivateImpl& operator=(const PrivateImpl&); + QPID_CLIENT_EXTERN void swap(PrivateImpl<T>&); + + protected: + QPID_CLIENT_EXTERN PrivateImpl(T*); + T* impl; + + friend class PrivateImplPrivate<T>; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_PRIVATEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/PrivateImplPrivate.h b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h new file mode 100644 index 0000000000..021456e085 --- /dev/null +++ b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h @@ -0,0 +1,66 @@ +#ifndef QPID_CLIENT_PRIVATEIMPLPRIVATE_H +#define QPID_CLIENT_PRIVATEIMPLPRIVATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <algorithm> + +namespace qpid { +namespace client { + +/** @file + * Implementation of PrivateImpl functions, to include in .cpp file of handle subclasses. + * T can be any class with value semantics. + */ + +template <class T> +PrivateImpl<T>::PrivateImpl(T* p) : impl(p) { assert(impl); } + +template <class T> +PrivateImpl<T>::~PrivateImpl() { delete impl; } + +template <class T> +PrivateImpl<T>::PrivateImpl(const PrivateImpl& h) : impl(new T(*h.impl)) {} + +template <class T> +PrivateImpl<T>& PrivateImpl<T>::operator=(const PrivateImpl<T>& h) { PrivateImpl<T>(h).swap(*this); return *this; } + +template <class T> +void PrivateImpl<T>::swap(PrivateImpl<T>& h) { std::swap(impl, h.impl); } + + +/** Access to private impl of a PrivateImpl */ +template <class T> +class PrivateImplPrivate { + public: + static T* get(const PrivateImpl<T>& h) { return h.impl; } + static void set(PrivateImpl<T>& h, const T& p) { PrivateImpl<T>(p).swap(h); } +}; + +template<class T> T* privateImplGetPtr(PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); } +template<class T> T* privateImplGetPtr(const PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); } +template<class T> void privateImplSetPtr(PrivateImpl<T>& h, const T*& p) { PrivateImplPrivate<T>::set(h, p); } + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_PRIVATEIMPLPRIVATE_H*/ + diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp index e81b78ecd3..8a33c7393f 100644 --- a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -20,6 +20,8 @@ */ #include "SessionBase_0_10.h" #include "Connection.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/Future.h" #include "qpid/framing/all_method_bodies.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.h b/qpid/cpp/src/qpid/client/SessionBase_0_10.h index 3ae21936f6..d375b3ec2e 100644 --- a/qpid/cpp/src/qpid/client/SessionBase_0_10.h +++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.h @@ -23,14 +23,11 @@ */ #include "qpid/SessionId.h" +#include "qpid/client/SessionImpl.h" #include "qpid/framing/amqp_structs.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/TransferContent.h" +#include "qpid/client/Message.h" #include "qpid/client/Completion.h" -#include "qpid/client/ConnectionImpl.h" #include "qpid/client/Execution.h" -#include "qpid/client/SessionImpl.h" #include "qpid/client/TypedResult.h" #include "qpid/shared_ptr.h" #include "qpid/client/ClientImportExport.h" @@ -44,7 +41,6 @@ class Connection; using std::string; using framing::Content; using framing::FieldTable; -using framing::MethodContent; using framing::SequenceNumber; using framing::SequenceSet; using framing::SequenceNumberSet; @@ -62,8 +58,6 @@ enum CreditUnit { MESSAGE_CREDIT=0, BYTE_CREDIT=1, UNLIMITED_CREDIT=0xFFFFFFFF } */ class SessionBase_0_10 { public: - - typedef framing::TransferContent DefaultContent; ///@internal QPID_CLIENT_EXTERN SessionBase_0_10(); diff --git a/qpid/cpp/src/qpid/client/Subscription.cpp b/qpid/cpp/src/qpid/client/Subscription.cpp index 1f1b5ac6c6..37f5557d51 100644 --- a/qpid/cpp/src/qpid/client/Subscription.cpp +++ b/qpid/cpp/src/qpid/client/Subscription.cpp @@ -21,6 +21,7 @@ #include "Subscription.h" #include "SubscriptionImpl.h" +#include "CompletionImpl.h" #include "HandlePrivate.h" #include "qpid/framing/enum.h" diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp index e09a4c142e..82c920cf47 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -20,8 +20,12 @@ */ #include "SubscriptionImpl.h" +#include "MessageImpl.h" +#include "CompletionImpl.h" #include "SubscriptionManager.h" #include "SubscriptionSettings.h" +#include "HandlePrivate.h" +#include "PrivateImplPrivate.h" namespace qpid { namespace client { @@ -114,9 +118,9 @@ void SubscriptionImpl::cancel() { manager.cancel(name); } void SubscriptionImpl::received(Message& m) { Mutex::ScopedLock l(lock); - if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) + if (privateImplGetPtr(m)->getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) unacquired.add(m.getId()); - else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT) + else if (privateImplGetPtr(m)->getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT) unaccepted.add(m.getId()); if (listener) { diff --git a/qpid/cpp/src/qpid/client/TypedResult.h b/qpid/cpp/src/qpid/client/TypedResult.h index 5306997d74..2e54f9fdfc 100644 --- a/qpid/cpp/src/qpid/client/TypedResult.h +++ b/qpid/cpp/src/qpid/client/TypedResult.h @@ -23,6 +23,7 @@ #define _TypedResult_ #include "Completion.h" +#include "qpid/framing/StructHelper.h" namespace qpid { namespace client { @@ -39,7 +40,7 @@ template <class T> class TypedResult : public Completion public: ///@internal - TypedResult(Future f, shared_ptr<SessionImpl> s) : Completion(f, s), decoded(false) {} + TypedResult(CompletionImpl* c) : Completion(c), decoded(false) {} /** * Wait for the asynchronous command that returned this TypedResult to complete @@ -49,13 +50,12 @@ public: *@exception If the command returns an error, get() throws an exception. * */ - T& get() - { + T& get() { if (!decoded) { - future.decodeResult(result, *session); + framing::StructHelper helper; + helper.decode(result, getResult()); decoded = true; } - return result; } }; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index f8e412f1e6..a17f54078c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -36,6 +36,7 @@ #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" +#include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" @@ -46,7 +47,6 @@ #include "qpid/management/ManagementBroker.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qpid/sys/LatencyMetric.h" #include "qpid/sys/Thread.h" #include <boost/bind.hpp> @@ -63,6 +63,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; using namespace qpid::cluster; +using namespace qpid::framing::cluster; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -77,9 +78,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } + void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } + void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -112,7 +114,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : discarding(true), state(INIT), lastSize(0), - lastBroker(false) + lastBroker(false), + error(*this) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -195,14 +198,19 @@ void Cluster::leave() { leave(l); } +#define LEAVE_TRY(STMT) try { STMT; } \ + catch (const std::exception& e) { \ + QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ + } do {} while(0) + void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - try { broker.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); - } + // Finalize connections now now to avoid problems later in destructor. + LEAVE_TRY(localConnections.clear()); + LEAVE_TRY(connections.clear()); + LEAVE_TRY(broker.shutdown()); } } @@ -218,8 +226,6 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - if (from == self) // Record self-deliveries for flow control. - mcast.selfDeliver(e); deliverEvent(e); } @@ -254,10 +260,22 @@ void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DROP: " << e); } +void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { + Mutex::ScopedLock l(lock); + error.error(connection, type, map.getFrameSeq(), map.getMembers()); +} + // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { Mutex::ScopedLock l(lock); + // Process each frame through the error checker. + error.delivered(e); + while (error.canProcess()) // There is a frame ready to process. + processFrame(error.getNext(), l); +} + +void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); @@ -265,7 +283,8 @@ void Cluster::deliveredFrame(const EventFrame& e) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { - QPID_LOG(trace, *this << " DLVR: " << e); + map.incrementFrameSeq(); + QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); ConnectionPtr connection = getConnection(e.connectionId, l); if (connection) connection->deliveredFrame(e); @@ -316,11 +335,11 @@ ostream& operator<<(ostream& o, const AddrList& a) { for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " (joined) "; break; - case CPG_REASON_LEAVE: reasonString = " (left) "; break; - case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break; - case CPG_REASON_NODEUP: reasonString = " (node-up) "; break; - case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break; + case CPG_REASON_JOIN: reasonString = "(joined) "; break; + case CPG_REASON_LEAVE: reasonString = "(left) "; break; + case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break; + case CPG_REASON_NODEUP: reasonString = "(node-up) "; break; + case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); @@ -342,8 +361,8 @@ void Cluster::configChange ( broker.setRecovery(nCurrent == 1); initialized = true; } - QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) - << AddrList(left, nLeft, "( ", ")")); + QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) + << AddrList(left, nLeft, "left: ")); std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); @@ -357,8 +376,8 @@ void Cluster::setReady(Lock&) { broker.getQueueEvents().enable(); } -void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { - bool memberChange = map.configChange(addresses); +void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { + bool memberChange = map.configChange(current); if (state == LEFT) return; if (!map.isAlive(self)) { // Final config change. @@ -589,19 +608,24 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_memberIDs(idstr); } - // Erase connections belonging to members that have left the cluster. + // Close connections belonging to members that have left the cluster. ConnectionMap::iterator i = connections.begin(); while (i != connections.end()) { ConnectionMap::iterator j = i++; MemberId m = j->second->getId().getMember(); if (m != self && !map.isMember(m)) - connections.erase(j); + j->second->deliverClose(); } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.self << "(" << STATE[cluster.state] << ")"; + static const char* STATE[] = { + "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + }; + assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); + o << cluster.self << "(" << STATE[cluster.state]; + if (cluster.error.isUnresolved()) o << "/error"; + return o << ")"; } MemberId Cluster::getId() const { @@ -635,4 +659,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } +void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { + // If we receive an errorCheck here, it's because we have processed past the point + // of the error so respond with ERROR_TYPE_NONE + assert(map.getFrameSeq() >= frameSeq); + if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE. + mcast.mcastControl( + ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index b716e2d781..8a94fc79dd 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -23,6 +23,7 @@ #include "ClusterSettings.h" #include "Cpg.h" #include "Decoder.h" +#include "ErrorCheck.h" #include "Event.h" #include "EventFrame.h" #include "ExpiryPolicy.h" @@ -105,6 +106,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void deliverFrame(const EventFrame&); + // Called in deliverFrame thread to indicate an error from the broker. + void flagError(Connection&, ErrorCheck::ErrorType); + void connectionError(); + // Called only during update by Connection::shadowReady Decoder& getDecoder() { return decoder; } @@ -132,13 +137,15 @@ class Cluster : private Cpg::Handler, public management::Manageable { // == Called in deliverFrameQueue thread void deliveredFrame(const EventFrame&); + void processFrame(const EventFrame&, Lock&); // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); - void configChange(const MemberId&, const std::string& addresses, Lock& l); + void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); + void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&); void shutdown(const MemberId&, Lock&); // Helper functions @@ -216,11 +223,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { Decoder decoder; bool discarding; + // Remaining members are protected by lock. - // FIXME aconway 2009-03-06: Most of these members are also only used in + + // TODO aconway 2009-03-06: Most of these members are also only used in // deliverFrameQueue thread or during stall. Review and separate members // that require a lock, drop lock when not needed. - // + mutable sys::Monitor lock; @@ -243,7 +252,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; - + ErrorCheck error; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index 9e7232180d..0395ff6382 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -33,6 +33,13 @@ using namespace framing; namespace cluster { +ClusterMap::Set ClusterMap::decode(const std::string& s) { + Set set; + for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) + set.insert(MemberId(std::string(i, i+8))); + return set; +} + namespace { void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) { @@ -54,9 +61,9 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { } -ClusterMap::ClusterMap() {} +ClusterMap::ClusterMap() : frameSeq(0) {} -ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { +ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) { alive.insert(id); if (isMember) members[id] = url; @@ -64,7 +71,9 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { joiners[id] = url; } -ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) { +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_) + : frameSeq(frameSeq_) +{ std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive))); std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); } @@ -78,22 +87,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const } b.getMembers().clear(); std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); -} - -bool ClusterMap::configChange( - cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address */*joined*/, int /*nJoined*/) -{ - cpg_address* a; - bool memberChange=false; - for (a = left; a != left+nLeft; ++a) { - memberChange = memberChange || members.erase(*a); - joiners.erase(*a); - } - alive.clear(); - std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); - return memberChange; + b.setFrameSeq(frameSeq); } Url ClusterMap::getUrl(const Map& map, const MemberId& id) { @@ -123,8 +117,13 @@ std::vector<Url> ClusterMap::memberUrls() const { return urls; } -ClusterMap::Set ClusterMap::getAlive() const { - return alive; +ClusterMap::Set ClusterMap::getAlive() const { return alive; } + +ClusterMap::Set ClusterMap::getMembers() const { + Set s; + std::transform(members.begin(), members.end(), std::inserter(s, s.begin()), + boost::bind(&Map::value_type::first, _1)); + return s; } std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { @@ -158,7 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { bool ClusterMap::configChange(const std::string& addresses) { bool memberChange = false; - Set update; + Set update = decode(addresses); for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8) update.insert(MemberId(std::string(i, i+8))); Set removed; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 4548441442..3359c7c1f3 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -38,26 +38,26 @@ namespace qpid { namespace cluster { +typedef std::set<MemberId> MemberSet; + /** - * Map of established cluster members and joiners waiting for an update. + * Map of established cluster members and joiners waiting for an update, + * along with other cluster state that must be updated. */ class ClusterMap { public: typedef std::map<MemberId, Url> Map; typedef std::set<MemberId> Set; + static Set decode(const std::string&); + ClusterMap(); ClusterMap(const MemberId& id, const Url& url, bool isReady); - ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states); + ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq); /** Update from config change. *@return true if member set changed. */ - bool configChange( - cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address *joined, int nJoined); - bool configChange(const std::string& addresses); bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); } @@ -78,6 +78,7 @@ class ClusterMap { std::vector<std::string> memberIds() const; std::vector<Url> memberUrls() const; Set getAlive() const; + Set getMembers() const; bool updateRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ @@ -90,11 +91,16 @@ class ClusterMap { * Utility method to return intersection of two member sets */ static Set intersection(const Set& a, const Set& b); + + uint64_t getFrameSeq() { return frameSeq; } + uint64_t incrementFrameSeq() { return ++frameSeq; } + private: Url getUrl(const Map& map, const MemberId& id); Map joiners, members; Set alive; + uint64_t frameSeq; friend std::ostream& operator<<(std::ostream&, const Map&); friend std::ostream& operator<<(std::ostream&, const ClusterMap&); diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index aa7d082720..97cafbabaa 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -39,8 +39,6 @@ #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" -#include "qpid/sys/LatencyMetric.h" -#include "qpid/sys/AtomicValue.h" #include <boost/current_function.hpp> @@ -56,8 +54,16 @@ namespace qpid { namespace cluster { using namespace framing; +using namespace framing::cluster; + +qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); + +Connection::NullFrameHandler Connection::nullFrameHandler; + +struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} +}; -NoOpConnectionOutputHandler Connection::discardHandler; namespace { sys::AtomicValue<uint64_t> idCounter; @@ -89,6 +95,8 @@ void Connection::init() { connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames connection.setClientThrottling(false); // Disable client throttling, done by active node. } + if (!isCatchUp()) + connection.setErrorListener(this); } void Connection::giveReadCredit(int credit) { @@ -97,6 +105,7 @@ void Connection::giveReadCredit(int credit) { } Connection::~Connection() { + connection.setErrorListener(0); QPID_LOG(debug, cluster << " deleted connection: " << *this); } @@ -126,7 +135,7 @@ void Connection::received(framing::AMQFrame& f) { cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); - output.closeOutput(discardHandler); + output.closeOutput(); catchUp = false; } else @@ -156,8 +165,8 @@ void Connection::deliveredFrame(const EventFrame& f) { { if (f.type == DATA) // incoming data frames to broker::Connection connection.received(const_cast<AMQFrame&>(f.frame)); - else { // frame control, send frame via SessionState - broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + else { // frame control, send frame via SessionState + broker::SessionState* ss = connection.getChannel(currentChannel).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } } @@ -180,7 +189,7 @@ void Connection::closed() { // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. - output.closeOutput(discardHandler); + output.closeOutput(); cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); } } @@ -275,13 +284,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; connection.setUserId(username); - // OK to use decoder here because we are stalled for update. + // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); + connection.setErrorListener(this); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); self.second = 0; // Mark this as completed update connection. } @@ -305,7 +315,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { } broker::QueuedMessage Connection::getUpdateMessage() { - broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get(); + shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); + assert(!updateq->isDurable()); + broker::QueuedMessage m = updateq->get(); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -342,15 +354,15 @@ void Connection::deliveryRecord(const string& qname, // If the message was unacked, the newbie broker must place // it in its messageStore. - if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled ) + if ( m.payload && m.payload->isPersistent() && acquired && !ended) queue->enqueue ( 0, m.payload ); } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); - if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); - q->setPosition(position); -} + shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); + if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); + q->setPosition(position); + } void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); @@ -407,7 +419,14 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } -qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); +void Connection::sessionError(uint16_t , const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_SESSION); + +} + +void Connection::connectionError(const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_CONNECTION); +} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 6434f763a8..414e5c935f 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -25,7 +25,6 @@ #include "types.h" #include "WriteEstimate.h" #include "OutputInterceptor.h" -#include "NoOpConnectionOutputHandler.h" #include "EventFrame.h" #include "McastFrameHandler.h" @@ -58,7 +57,8 @@ class Event; class Connection : public RefCounted, public sys::ConnectionInputHandler, - public framing::AMQP_AllOperations::ClusterConnectionHandler + public framing::AMQP_AllOperations::ClusterConnectionHandler, + private broker::Connection::ErrorListener { public: @@ -120,7 +120,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, @@ -151,14 +151,22 @@ class Connection : void giveReadCredit(int credit); + void deliverClose(); + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} }; + + static NullFrameHandler nullFrameHandler; + + // Error listener functions + void connectionError(const std::string&); + void sessionError(uint16_t channel, const std::string&); + void init(); bool checkUnsupported(const framing::AMQBody& body); - void deliverClose(); void deliverDoOutput(uint32_t requested); void sendDoOutput(); @@ -167,8 +175,6 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); - static NoOpConnectionOutputHandler discardHandler; - Cluster& cluster; ConnectionId self; bool catchUp; @@ -181,7 +187,6 @@ class Connection : boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; - NullFrameHandler nullFrameHandler; static qpid::sys::AtomicValue<uint64_t> catchUpId; diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 915a578989..f746eacea4 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -75,11 +75,14 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow ::memset(&callbacks, sizeof(callbacks), 0); callbacks.cpg_deliver_fn = &globalDeliver; callbacks.cpg_confchg_fn = &globalConfigChange; + + QPID_LOG(info, "Initializing CPG"); cpg_error_t err = cpg_initialize(&handle, &callbacks); - if (err == CPG_ERR_TRY_AGAIN) { - QPID_LOG(notice, "Waiting for CPG initialization."); - while (CPG_ERR_TRY_AGAIN == (err = cpg_initialize(&handle, &callbacks))) - sys::sleep(5); + int retries = 6; + while (err == CPG_ERR_TRY_AGAIN && --retries) { + QPID_LOG(notice, "Re-trying CPG initialization."); + sys::sleep(5); + err = cpg_initialize(&handle, &callbacks); } check(err, "Failed to initialize CPG."); check(cpg_context_set(handle, this), "Cannot set CPG context"); @@ -87,7 +90,6 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow // windows then this needs to be refactored into // qpid::sys::<platform> IOHandle::impl->fd = getFd(); - QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle); } Cpg::~Cpg() { diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp new file mode 100644 index 0000000000..6132d52126 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ErrorCheck.h" +#include "EventFrame.h" +#include "ClusterMap.h" +#include "Cluster.h" +#include "qpid/framing/ClusterErrorCheckBody.h" +#include "qpid/framing/ClusterConfigChangeBody.h" +#include "qpid/log/Statement.h" + +#include <algorithm> + +namespace qpid { +namespace cluster { + +using namespace std; +using namespace framing; +using namespace framing::cluster; + +ErrorCheck::ErrorCheck(Cluster& c) + : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) +{} + +ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) { + copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " ")); + return o; +} + +void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms) +{ + // Detected a local error, inform cluster and set error state. + assert(t != ERROR_TYPE_NONE); // Must be an error. + assert(type == ERROR_TYPE_NONE); // Can only be called while processing + type = t; + unresolved = ms; + frameSeq = seq; + connection = &c; + QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection") + << " error " << frameSeq << " unresolved: " << unresolved); + mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId()); +} + +void ErrorCheck::delivered(const EventFrame& e) { + if (isUnresolved()) { + const ClusterErrorCheckBody* errorCheck = + dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod()); + const ClusterConfigChangeBody* configChange = + dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod()); + + if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error + if (errorCheck->getType() < type) { // my error is worse than his + QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId()); + throw Exception("Aborted by local failure that did not occur on all replicas"); + } + else { // his error is worse/same as mine. + QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId()); + unresolved.erase(e.getMemberId()); + checkResolved(); + } + } + else { + frames.push_back(e); // Only drop matching errorCheck controls. + if (configChange) { + MemberSet members(ClusterMap::decode(configChange->getCurrent())); + MemberSet result; + set_intersection(members.begin(), members.end(), + unresolved.begin(), unresolved.end(), + inserter(result, result.begin())); + unresolved.swap(result); + checkResolved(); + } + } + } + else + frames.push_back(e); +} + +void ErrorCheck::checkResolved() { + if (unresolved.empty()) { // No more potentially conflicted members, we're clear. + type = ERROR_TYPE_NONE; + QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved."); + } + else + QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved); +} + +EventFrame ErrorCheck::getNext() { + assert(canProcess()); + EventFrame e(frames.front()); + frames.pop_front(); + return e; +} + +bool ErrorCheck::canProcess() const { + return type == ERROR_TYPE_NONE && !frames.empty(); +} + +bool ErrorCheck::isUnresolved() const { + return type != ERROR_TYPE_NONE; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h new file mode 100644 index 0000000000..97b5f2bffd --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h @@ -0,0 +1,80 @@ +#ifndef QPID_CLUSTER_ERRORCHECK_H +#define QPID_CLUSTER_ERRORCHECK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "Multicaster.h" +#include "qpid/framing/enum.h" +#include <boost/function.hpp> +#include <deque> +#include <set> + +namespace qpid { +namespace cluster { + +class EventFrame; +class ClusterMap; +class Cluster; +class Multicaster; +class Connection; + +/** + * Error checking logic. + * + * When an error occurs stop processing frames and queue them until we + * can determine if all nodes experienced the error. If not, we shut down. + */ +class ErrorCheck +{ + public: + typedef std::set<MemberId> MemberSet; + typedef framing::cluster::ErrorType ErrorType; + + ErrorCheck(Cluster&); + + /** A local error has occured */ + void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&); + + /** Called when a frame is delivered */ + void delivered(const EventFrame&); + + EventFrame getNext(); + + bool canProcess() const; + bool isUnresolved() const; + + private: + void checkResolved(); + + Cluster& cluster; + Multicaster& mcast; + std::deque<EventFrame> frames; + std::set<MemberId> unresolved; + uint64_t frameSeq; + ErrorType type; + Connection* connection; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_ERRORCHECK_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h index e05ad60bcf..f0e445a08c 100644 --- a/qpid/cpp/src/qpid/cluster/Event.h +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -25,7 +25,6 @@ #include "types.h" #include "qpid/RefCountedBuffer.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/sys/LatencyMetric.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -42,7 +41,7 @@ class Buffer; namespace cluster { /** Header data for a multicast event */ -class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { +class EventHeader { public: EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); void decode(const MemberId& m, framing::Buffer&); diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp index 9350c801f5..a48d134f1b 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp +++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp @@ -28,9 +28,7 @@ EventFrame::EventFrame() {} EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType()) -{ - QPID_LATENCY_INIT(frame); -} +{} std::ostream& operator<<(std::ostream& o, const EventFrame& e) { if (e.frame.getBody()) o << e.frame; diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h index d6ff58dd38..e275aac7aa 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.h +++ b/qpid/cpp/src/qpid/cluster/EventFrame.h @@ -25,7 +25,6 @@ #include "types.h" #include "Event.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/sys/LatencyMetric.h" #include <boost/intrusive_ptr.hpp> #include <iosfwd> @@ -45,6 +44,7 @@ struct EventFrame bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } bool isLastInEvent() const { return readCredit; } + MemberId getMemberId() const { return connectionId.getMember(); } ConnectionId connectionId; diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp index 409180c499..348963f901 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -50,6 +50,13 @@ void ExpiryPolicy::willExpire(broker::Message& m) { timer.add(new ExpiryTask(this, id, m.getExpiration())); } +void ExpiryPolicy::forget(broker::Message& m) { + MessageIdMap::iterator i = unexpiredByMessage.find(&m); + assert(i != unexpiredByMessage.end()); + unexpiredById.erase(i->second); + unexpiredByMessage.erase(i); +} + bool ExpiryPolicy::hasExpired(broker::Message& m) { return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); } diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h index 9f8b1a9236..c147e54796 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -49,8 +49,8 @@ class ExpiryPolicy : public broker::ExpiryPolicy ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&); void willExpire(broker::Message&); - bool hasExpired(broker::Message&); + void forget(broker::Message&); // Send expiration notice to cluster. void sendExpire(uint64_t); diff --git a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h index 8b2f6dae8e..4df742d6c2 100644 --- a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h +++ b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h @@ -52,6 +52,8 @@ class LockedConnectionMap return 0; } + void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); } + private: typedef std::map<ConnectionId, ConnectionPtr> Map; mutable sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index f0738ab08f..3b9d3ac990 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -22,7 +22,6 @@ #include "Multicaster.h" #include "Cpg.h" #include "qpid/log/Statement.h" -#include "qpid/sys/LatencyMetric.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" @@ -64,7 +63,6 @@ void Multicaster::mcast(const Event& e) { return; } } - QPID_LATENCY_INIT(e); queue.push(e); } @@ -73,7 +71,6 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { try { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { - QPID_LATENCY_RECORD("mcast send queue", *i); iovec iov = i->toIovec(); if (!cpg.mcast(&iov, 1)) { // cpg didn't send because of CPG flow control. @@ -97,9 +94,4 @@ void Multicaster::release() { holdingQueue.clear(); } -void Multicaster::selfDeliver(const Event& e) { - sys::Mutex::ScopedLock l(lock); - QPID_LATENCY_RECORD("cpg self deliver", e); -} - }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h index d1c3115977..baa5b87f38 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.h +++ b/qpid/cpp/src/qpid/cluster/Multicaster.h @@ -55,8 +55,6 @@ class Multicaster void mcast(const Event& e); /** End holding mode, held events are mcast */ void release(); - /** Call when events are self-delivered to manage flow control. */ - void selfDeliver(const Event&); private: typedef sys::PollableQueue<Event> PollableEventQueue; diff --git a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h index 74a376a657..6a30bddf06 100644 --- a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h +++ b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h @@ -30,8 +30,7 @@ namespace framing { class AMQFrame; } namespace cluster { /** - * Output handler for frames sent to noop connections. - * Simply discards frames. + * Output handler shadow connections, simply discards frames. */ class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler { diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index cd42446016..6af114a662 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -32,8 +32,9 @@ namespace cluster { using namespace framing; -OutputInterceptor::OutputInterceptor( - cluster::Connection& p, sys::ConnectionOutputHandler& h) +NoOpConnectionOutputHandler OutputInterceptor::discardHandler; + +OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) : parent(p), closing(false), next(&h), sent(), writeEstimate(p.getCluster().getWriteEstimate()), moreOutput(), doingOutput() @@ -47,7 +48,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) { } if (!parent.isCatchUp()) sent += f.encodedSize(); - QPID_LATENCY_RECORD("up to write queue", f); } void OutputInterceptor::activateOutput() { @@ -98,7 +98,6 @@ void OutputInterceptor::deliverDoOutput(size_t requested) { // Send a doOutput request if one is not already in flight. void OutputInterceptor::sendDoOutput() { if (!parent.isLocal()) return; - QPID_LATENCY_INIT(*this); doingOutput = true; size_t request = writeEstimate.sending(getBuffered()); @@ -111,10 +110,10 @@ void OutputInterceptor::sendDoOutput() { QPID_LOG(trace, parent << "Send doOutput request for " << request); } -void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) { +void OutputInterceptor::closeOutput() { sys::Mutex::ScopedLock l(lock); closing = true; - next = &h; + next = &discardHandler; } void OutputInterceptor::close() { diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h index c080a419e1..61e246bb89 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h @@ -23,9 +23,9 @@ */ #include "WriteEstimate.h" +#include "NoOpConnectionOutputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/broker/ConnectionFactory.h" -#include "qpid/sys/LatencyMetric.h" #include <boost/function.hpp> namespace qpid { @@ -37,7 +37,7 @@ class Connection; /** * Interceptor for connection OutputHandler, manages outgoing message replication. */ -class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetricTimestamp { +class OutputInterceptor : public sys::ConnectionOutputHandler { public: OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h); @@ -53,7 +53,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri // Intercept doOutput requests on Connection. bool doOutput(); - void closeOutput(sys::ConnectionOutputHandler& h); + void closeOutput(); cluster::Connection& parent; @@ -70,6 +70,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri WriteEstimate writeEstimate; bool moreOutput; bool doingOutput; + static NoOpConnectionOutputHandler discardHandler; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 97eae7efa3..bb4df8890a 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -26,6 +26,9 @@ #include "ExpiryPolicy.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Future.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" @@ -98,10 +101,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con expiry(expiry_), connections(cons), decoder(decoder_), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) -{ - connection.open(url, cs); - session = connection.newSession(UPDATE); -} +{} UpdateClient::~UpdateClient() {} @@ -110,6 +110,8 @@ const std::string UpdateClient::UPDATE("qpid.cluster-update"); void UpdateClient::run() { try { + connection.open(updateeUrl, connectionSettings); + session = connection.newSession(UPDATE); update(); done(); } catch (const std::exception& e) { @@ -126,15 +128,19 @@ void UpdateClient::update() { // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - session.close(); std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + session.queueDelete(arg::queue=UPDATE); + session.close(); + + ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); + connection.close(); QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); } @@ -203,7 +209,6 @@ class MessageUpdater { sb.get()->send(transfer, message.payload->getFrames()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 23d061b7e4..96e2479955 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -24,6 +24,7 @@ #include "ClusterMap.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" #include "qpid/client/AsyncSession.h" #include "qpid/broker/SemanticState.h" #include "qpid/sys/Runnable.h" diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index 34319e7ed4..2f0a01d6ca 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -26,7 +26,6 @@ #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" #include "ProtocolVersion.h" -#include "qpid/sys/LatencyMetric.h" #include <boost/intrusive_ptr.hpp> #include <boost/cast.hpp> #include "qpid/CommonImportExport.h" @@ -34,7 +33,7 @@ namespace qpid { namespace framing { -class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp +class AMQFrame : public AMQDataBlock { public: QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0); diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp b/qpid/cpp/src/qpid/sys/LatencyMetric.cpp deleted file mode 100644 index caa221def4..0000000000 --- a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifdef QPID_LATENCY_METRIC - -#include "LatencyMetric.h" -#include "Time.h" -#include <iostream> - -namespace qpid { -namespace sys { - -void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) { - const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now()); -} - -void LatencyMetricTimestamp::clear(const LatencyMetricTimestamp& ts) { - const_cast<int64_t&>(ts.latency_metric_timestamp) = 0; -} - -LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) : - message(msg), count(0), total(0), skipped(0), skip(skip_) -{} - -LatencyMetric::~LatencyMetric() { report(); } - -void LatencyMetric::record(const LatencyMetricTimestamp& start) { - if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps. - if (skip) { - if (++skipped < skip) return; - else skipped = 0; - } - ++count; - int64_t now_ = Duration(now()); - total += now_ - start.latency_metric_timestamp; - // Set start time for next leg of the journey - const_cast<int64_t&>(start.latency_metric_timestamp) = now_; -} - -void LatencyMetric::report() { - using namespace std; - if (count) { - cout << "LATENCY: " << message << ": " - << total / (count * TIME_USEC) << " microseconds" << endl; - } - else { - cout << "LATENCY: " << message << ": no data." << endl; - } - count = 0; - total = 0; -} - - -}} // namespace qpid::sys - -#endif diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.h b/qpid/cpp/src/qpid/sys/LatencyMetric.h deleted file mode 100644 index 63b5020db4..0000000000 --- a/qpid/cpp/src/qpid/sys/LatencyMetric.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef QPID_SYS_LATENCYMETRIC_H -#define QPID_SYS_LATENCYMETRIC_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifdef QPID_LATENCY_METRIC - -#include "qpid/sys/IntegerTypes.h" - -namespace qpid { -namespace sys { - -/** Use this base class to add a timestamp for latency to an object */ -struct LatencyMetricTimestamp { - LatencyMetricTimestamp() : latency_metric_timestamp(0) {} - static void initialize(const LatencyMetricTimestamp&); - static void clear(const LatencyMetricTimestamp&); - int64_t latency_metric_timestamp; -}; - -/** - * Record average latencies, report on destruction. - * - * For debugging only, use via macros below so it can be compiled out - * of production code. - */ -class LatencyMetric { - public: - /** msg should be a string literal. */ - LatencyMetric(const char* msg, int64_t skip_=0); - ~LatencyMetric(); - - void record(const LatencyMetricTimestamp& start); - - private: - void report(); - const char* message; - int64_t count, total, skipped, skip; -}; - -}} // namespace qpid::sys - -#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x) -#define QPID_LATENCY_CLEAR(x) ::qpid::sys::LatencyMetricTimestamp::clear(x) -#define QPID_LATENCY_RECORD(msg, x) do { \ - static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \ - } while (false) -#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) do { \ - static ::qpid::sys::LatencyMetric metric__(msg, skip); metric__.record(x); \ - } while (false) - - -#else /* defined QPID_LATENCY_METRIC */ - -namespace qpid { namespace sys { -class LatencyMetricTimestamp {}; -}} - -#define QPID_LATENCY_INIT(x) (void)x -#define QPID_LATENCY_CLEAR(x) (void)x -#define QPID_LATENCY_RECORD(msg, x) (void)x -#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) (void)x - -#endif /* defined QPID_LATENCY_METRIC */ - -#endif /*!QPID_SYS_LATENCYMETRIC_H*/ diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h index 067c0fe6b7..cd89f39292 100755 --- a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h +++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -23,6 +23,7 @@ */ #include "AsynchIoResult.h" +#include "qpid/CommonImportExport.h" #include <winsock2.h> @@ -49,7 +50,7 @@ public: AsynchIO::RequestCallback cbRequest; }; -SOCKET toFd(const IOHandlePrivate* h); +QPID_COMMON_EXTERN SOCKET toFd(const IOHandlePrivate* h); }} diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp index e33b2dc35d..41423d8245 100644 --- a/qpid/cpp/src/tests/AsyncCompletion.cpp +++ b/qpid/cpp/src/tests/AsyncCompletion.cpp @@ -24,6 +24,8 @@ #include "qpid/sys/BlockingQueue.h" #include "qpid/client/AsyncSession.h" #include "qpid/sys/Time.h" +#include "qpid/framing/QueueQueryResult.h" +#include "qpid/client/TypedResult.h" using namespace std; using namespace qpid; @@ -97,4 +99,15 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete) { sync.wait(); // Should complete now, all messages are completed. } +QPID_AUTO_TEST_CASE(testGetResult) { + SessionFixture fix; + AsyncSession s = fix.session; + + s.queueDeclare("q", arg::durable=true); + TypedResult<QueueQueryResult> tr = s.queueQuery("q"); + QueueQueryResult qq = tr.get(); + BOOST_CHECK_EQUAL(qq.getQueue(), "q"); + BOOST_CHECK_EQUAL(qq.getMessageCount(), 0U); +} + QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/BasicP2PTest.cpp b/qpid/cpp/src/tests/BasicP2PTest.cpp deleted file mode 100644 index f4a4cce7f2..0000000000 --- a/qpid/cpp/src/tests/BasicP2PTest.cpp +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "BasicP2PTest.h" - -using namespace qpid; -using namespace qpid::client; - -class BasicP2PTest::Receiver : public Worker, public MessageListener -{ - const std::string queue; - std::string tag; -public: - Receiver(ConnectionOptions& options, const std::string& _queue, const int _messages) - : Worker(options, _messages), queue(_queue){} - void init() - { - Queue q(queue, true); - channel.declareQueue(q); - framing::FieldTable args; - channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, q, queue, args); - channel.consume(q, tag, this); - channel.start(); - } - - void start() - { - } - - void received(Message&) - { - count++; - } -}; - -void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options) -{ - std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME"); - int messages = params.getInt("P2P_NUM_MESSAGES"); - if (role == "SENDER") { - worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages)); - } else if(role == "RECEIVER"){ - worker = std::auto_ptr<Worker>(new Receiver(options, queue, messages)); - } else { - throw Exception("unrecognised role"); - } - worker->init(); -} diff --git a/qpid/cpp/src/tests/BasicPubSubTest.cpp b/qpid/cpp/src/tests/BasicPubSubTest.cpp deleted file mode 100644 index 1e9ff364f1..0000000000 --- a/qpid/cpp/src/tests/BasicPubSubTest.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "BasicPubSubTest.h" - -using namespace qpid; - -class BasicPubSubTest::Receiver : public Worker, public MessageListener -{ - const Exchange& exchange; - const std::string queue; - const std::string key; - std::string tag; -public: - Receiver(ConnectionOptions& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages) - : Worker(options, _messages), exchange(_exchange), queue(_queue), key(_key){} - - void init() - { - Queue q(queue, true); - channel.declareQueue(q); - framing::FieldTable args; - channel.bind(exchange, q, key, args); - channel.consume(q, tag, this); - channel.start(); - } - - void start(){ - } - - void received(Message&) - { - count++; - } -}; - -class BasicPubSubTest::MultiReceiver : public Worker, public MessageListener -{ - typedef boost::ptr_vector<Receiver> ReceiverList; - ReceiverList receivers; - -public: - MultiReceiver(ConnectionOptions& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount) - : Worker(options, _messages) - { - for (int i = 0; i != receiverCount; i++) { - std::string queue = (boost::format("%1%_%2%") % options.clientid % i).str(); - receivers.push_back(new Receiver(options, exchange, queue, key, _messages)); - } - } - - void init() - { - for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { - receivers[i].init(); - } - } - - void start() - { - for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { - receivers[i].start(); - } - } - - void received(Message& msg) - { - for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { - receivers[i].received(msg); - } - } - - virtual int getCount() - { - count = 0; - for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { - count += receivers[i].getCount(); - } - return count; - } - virtual void stop() - { - for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { - receivers[i].stop(); - } - } -}; - -void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options) -{ - std::string key = params.getString("PUBSUB_KEY"); - int messages = params.getInt("PUBSUB_NUM_MESSAGES"); - int receivers = params.getInt("PUBSUB_NUM_RECEIVERS"); - if (role == "SENDER") { - worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages)); - } else if(role == "RECEIVER"){ - worker = std::auto_ptr<Worker>(new MultiReceiver(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages, receivers)); - } else { - throw Exception("unrecognised role"); - } - worker->init(); -} - diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h index f55560739d..b32b7f44ba 100644 --- a/qpid/cpp/src/tests/BrokerFixture.h +++ b/qpid/cpp/src/tests/BrokerFixture.h @@ -114,10 +114,12 @@ struct ClientT { SessionType session; qpid::client::SubscriptionManager subs; qpid::client::LocalQueue lq; - ClientT(uint16_t port, const std::string& name=std::string()) - : connection(port), session(connection.newSession(name)), subs(session) {} - ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string()) - : connection(settings), session(connection.newSession(name)), subs(session) {} + std::string name; + + ClientT(uint16_t port, const std::string& name_=std::string()) + : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {} + ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string()) + : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {} ~ClientT() { connection.close(); } }; diff --git a/qpid/cpp/src/tests/ClientMessageTest.cpp b/qpid/cpp/src/tests/ClientMessageTest.cpp new file mode 100644 index 0000000000..bc0945674f --- /dev/null +++ b/qpid/cpp/src/tests/ClientMessageTest.cpp @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +/**@file Unit tests for the client::Message class. */ + +#include "unit_test.h" +#include "qpid/client/Message.h" + +using namespace qpid::client; + +QPID_AUTO_TEST_SUITE(ClientMessageTestSuite) + +QPID_AUTO_TEST_CASE(MessageCopyAssign) { + // Verify that message has normal copy semantics. + Message m("foo"); + BOOST_CHECK_EQUAL("foo", m.getData()); + Message c(m); + BOOST_CHECK_EQUAL("foo", c.getData()); + Message a; + BOOST_CHECK_EQUAL("", a.getData()); + a = m; + BOOST_CHECK_EQUAL("foo", a.getData()); + a.setData("a"); + BOOST_CHECK_EQUAL("a", a.getData()); + c.setData("c"); + BOOST_CHECK_EQUAL("c", c.getData()); + BOOST_CHECK_EQUAL("foo", m.getData()); +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 589e1154e1..1c719d16dc 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -28,7 +28,7 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Time.h" #include "qpid/client/Session.h" -#include "qpid/framing/TransferContent.h" +#include "qpid/client/Message.h" #include "qpid/framing/reply_exceptions.h" #include <boost/optional.hpp> @@ -121,7 +121,7 @@ QPID_AUTO_TEST_CASE(testDispatcher) fix.session =fix.connection.newSession(); size_t count = 100; for (size_t i = 0; i < count; ++i) - fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue")); + fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue")); DummyListener listener(fix.session, "my-queue", count); listener.run(); BOOST_CHECK_EQUAL(count, listener.messages.size()); @@ -137,7 +137,7 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) DummyListener listener(fix.session, "my-queue", count); sys::Thread t(listener); for (size_t i = 0; i < count; ++i) { - fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue")); + fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue")); } t.join(); BOOST_CHECK_EQUAL(count, listener.messages.size()); @@ -173,7 +173,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) fix.session.suspend(); // Make sure we are still subscribed after resume. fix.connection.resume(fix.session); - fix.session.messageTransfer(arg::content=TransferContent("my-message", "my-queue")); + fix.session.messageTransfer(arg::content=Message("my-message", "my-queue")); FrameSet::shared_ptr msg = fix.session.get(); BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); } diff --git a/qpid/cpp/src/tests/ClusterFailover.cpp b/qpid/cpp/src/tests/ClusterFailover.cpp new file mode 100644 index 0000000000..db2392b296 --- /dev/null +++ b/qpid/cpp/src/tests/ClusterFailover.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/**@file Tests for partial failure in a cluster. + * Partial failure means some nodes experience a failure while others do not. + * In this case the failed nodes must shut down. + */ + +#include "test_tools.h" +#include "unit_test.h" +#include "ClusterFixture.h" +#include "qpid/client/FailoverManager.h" +#include <boost/assign.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/bind.hpp> + +QPID_AUTO_TEST_SUITE(ClusterFailoverTestSuite) + +using namespace std; +using namespace qpid; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace qpid::client; +using namespace qpid::client::arg; +using namespace boost::assign; +using broker::Broker; +using boost::shared_ptr; + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=sys::TIME_SEC/4; + + +// Test re-connecting with same session name after a failure. +QPID_AUTO_TEST_CASE(testReconnectSameSessionName) { + ClusterFixture cluster(2, -1); + Client c0(cluster[0], "foo"); + cluster.kill(0, 9); + Client c1(cluster[1], "foo"); // Using same name, should be cleaned up. +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/ClusterFixture.cpp b/qpid/cpp/src/tests/ClusterFixture.cpp index 5658957b48..70d60b10b4 100644 --- a/qpid/cpp/src/tests/ClusterFixture.cpp +++ b/qpid/cpp/src/tests/ClusterFixture.cpp @@ -61,8 +61,14 @@ using boost::assign::list_of; #include "ClusterFixture.h" -ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_) - : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_) +ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_, const string& clusterLib_) + : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_), clusterLib(clusterLib_) +{ + add(n); +} + +ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_, const string& clusterLib_) + : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_), clusterLib(clusterLib_) { add(n); } @@ -70,13 +76,14 @@ ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_) const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS = list_of<string>("--auth=no")("--no-data-dir"); -ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) { - Args args = list_of<string>("qpidd " __FILE__) +ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) { + Args args = list_of<string>("qpidd ") ("--no-module-dir") - ("--load-module=../.libs/cluster.so") - ("--cluster-name")(name) + ("--load-module")(clusterLib) + ("--cluster-name")(name) ("--log-prefix")(prefix); args.insert(args.end(), userArgs.begin(), userArgs.end()); + if (updateArgs) updateArgs(args, index); return args; } @@ -84,7 +91,7 @@ void ClusterFixture::add() { if (size() != size_t(localIndex)) { // fork a broker process. std::ostringstream os; os << "fork" << size(); std::string prefix = os.str(); - forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix)))); + forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size())))); push_back(forkedBrokers.back()->getPort()); } else { // Run in this process @@ -106,7 +113,7 @@ void ClusterFixture::addLocal() { assert(int(size()) == localIndex); ostringstream os; os << "local" << localIndex; string prefix = os.str(); - Args args(makeArgs(prefix)); + Args args(makeArgs(prefix, localIndex)); vector<const char*> argv(args.size()); transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1)); qpid::log::Logger::instance().setPrefix(prefix); @@ -116,7 +123,7 @@ void ClusterFixture::addLocal() { } bool ClusterFixture::hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); } - + /** Kill a forked broker with sig, or shutdown localBroker if n==0. */ void ClusterFixture::kill(size_t n, int sig) { if (n == size_t(localIndex)) @@ -131,3 +138,22 @@ void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig) kill(n,sig); try { c.close(); } catch(...) {} } + +/** + * Get the known broker ports from a Connection. + *@param n if specified wait for the cluster size to be n, up to a timeout. + */ +std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) { + std::vector<qpid::Url> urls = source.getKnownBrokers(); + if (n >= 0 && unsigned(n) != urls.size()) { + // Retry up to 10 secs in .1 second intervals. + for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { + qpid::sys::usleep(1000*100); // 0.1 secs + urls = source.getKnownBrokers(); + } + } + std::set<int> s; + for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) + s.insert((*i)[0].get<qpid::TcpAddress>()->port); + return s; +} diff --git a/qpid/cpp/src/tests/ClusterFixture.h b/qpid/cpp/src/tests/ClusterFixture.h index 84fb9f2202..353ec0c88d 100644 --- a/qpid/cpp/src/tests/ClusterFixture.h +++ b/qpid/cpp/src/tests/ClusterFixture.h @@ -38,6 +38,7 @@ #include "qpid/log/Logger.h" #include <boost/bind.hpp> +#include <boost/function.hpp> #include <boost/shared_ptr.hpp> #include <string> @@ -59,43 +60,55 @@ using qpid::broker::Broker; using boost::shared_ptr; using qpid::cluster::Cluster; - +#define DEFAULT_CLUSTER_LIB "../.libs/cluster.so" /** Cluster fixture is a vector of ports for the replicas. - * + * * At most one replica (by default replica 0) is in the current * process, all others are forked as children. */ class ClusterFixture : public vector<uint16_t> { public: typedef std::vector<std::string> Args; + static const Args DEFAULT_ARGS; + /** @param localIndex can be -1 meaning don't automatically start a local broker. * A local broker can be started with addLocal(). */ - ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS); + ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS, const string& clusterLib = DEFAULT_CLUSTER_LIB); + + /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */ + ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs, const string& clusterLib = DEFAULT_CLUSTER_LIB); + void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); // Add a broker. void setup(); bool hasLocal() const; - - /** Kill a forked broker with sig, or shutdown localBroker if n==0. */ + + /** Kill a forked broker with sig, or shutdown localBroker. */ void kill(size_t n, int sig=SIGINT); /** Kill a broker and suppressing errors from closing connection c. */ void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT); private: - static const Args DEFAULT_ARGS; - + void addLocal(); // Add a local broker. - Args makeArgs(const std::string& prefix); + Args makeArgs(const std::string& prefix, size_t index); string name; std::auto_ptr<BrokerFixture> localBroker; int localIndex; std::vector<shared_ptr<ForkedBroker> > forkedBrokers; Args userArgs; + boost::function<void (Args&, size_t)> updateArgs; + string clusterLib; }; +/** + * Get the known broker ports from a Connection. + *@param n if specified wait for the cluster size to be n, up to a timeout. + */ +std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1); #endif /*!CLUSTER_FIXTURE_H*/ diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp index f90f76aeb2..12175d3287 100644 --- a/qpid/cpp/src/tests/ForkedBroker.cpp +++ b/qpid/cpp/src/tests/ForkedBroker.cpp @@ -20,20 +20,39 @@ */ #include "ForkedBroker.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> +#include <boost/algorithm/string.hpp> #include <algorithm> #include <stdlib.h> #include <sys/types.h> #include <signal.h> -ForkedBroker::ForkedBroker(const Args& args) { init(args); } +using namespace std; +using qpid::ErrnoException; -ForkedBroker::ForkedBroker(int argc, const char* const argv[]) { init(Args(argv, argc+argv)); } +ForkedBroker::ForkedBroker(const Args& constArgs) { + Args args(constArgs); + Args::iterator i = find(args.begin(), args.end(), string("TMP_DATA_DIR")); + if (i != args.end()) { + args.erase(i); + char dd[] = "/tmp/ForkedBroker.XXXXXX"; + if (!mkdtemp(dd)) + throw qpid::ErrnoException("Can't create data dir"); + dataDir = dd; + args.push_back("--data-dir"); + args.push_back(dataDir); + } + init(args); +} ForkedBroker::~ForkedBroker() { - try { kill(); } catch(const std::exception& e) { + try { kill(); } + catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what())); } + if (!dataDir.empty()) + ::system(("rm -rf "+dataDir).c_str()); } void ForkedBroker::kill(int sig) { @@ -42,14 +61,25 @@ void ForkedBroker::kill(int sig) { pid = 0; // Reset pid here in case of an exception. using qpid::ErrnoException; if (::kill(savePid, sig) < 0) - throw ErrnoException("kill failed"); + throw ErrnoException("kill failed"); int status; if (::waitpid(savePid, &status, 0) < 0 && sig != 9) throw ErrnoException("wait for forked process failed"); if (WEXITSTATUS(status) != 0 && sig != 9) throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status))); } + +namespace std { +static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) { + copy(a.begin(), a.end(), ostream_iterator<string>(o, " ")); + return o; +} +bool isLogOption(const std::string& s) { + return boost::starts_with(s, "--log-enable") || boost::starts_with(s, "--trace"); +} + +} void ForkedBroker::init(const Args& userArgs) { using qpid::ErrnoException; @@ -70,17 +100,20 @@ void ForkedBroker::init(const Args& userArgs) { } else { // child ::close(pipeFds[0]); - // FIXME aconway 2009-02-12: int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent. if (fd < 0) throw ErrnoException("dup2 failed"); - const char* prog = "../qpidd"; + const char* prog = ::getenv("QPID_FORKED_BROKER"); + if (!prog) prog = "../qpidd"; Args args(userArgs); args.push_back("--port=0"); - if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) - args.push_back("--log-enable=error+"); // Keep quiet except for errors. + // Keep quiet except for errors. + if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE") + && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end()) + args.push_back("--log-enable=error+"); std::vector<const char*> argv(args.size()); std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1)); argv.push_back(0); + QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args); execv(prog, const_cast<char* const*>(&argv[0])); throw ErrnoException("execv failed"); } diff --git a/qpid/cpp/src/tests/ForkedBroker.h b/qpid/cpp/src/tests/ForkedBroker.h index 6f97fbdc09..45b522068c 100644 --- a/qpid/cpp/src/tests/ForkedBroker.h +++ b/qpid/cpp/src/tests/ForkedBroker.h @@ -48,19 +48,26 @@ class ForkedBroker { public: typedef std::vector<std::string> Args; + // argv args are passed to broker. + // + // Special value "TMP_DATA_DIR" is substituted with a temporary + // data directory for the broker. + // ForkedBroker(const Args& argv); - ForkedBroker(int argc, const char* const argv[]); ~ForkedBroker(); void kill(int sig=SIGINT); + int wait(); // Wait for exit, return exit status. uint16_t getPort() { return port; } pid_t getPID() { return pid; } private: + void init(const Args& args); pid_t pid; int port; + std::string dataDir; }; #endif /*!TESTS_FORKEDBROKER_H*/ diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 9a81ef18b3..161428fcad 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -96,7 +96,9 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ RetryList.cpp \ RateFlowcontrolTest.cpp \ FrameDecoder.cpp \ - ReplicationTest.cpp + ReplicationTest.cpp \ + ClientMessageTest.cpp \ + PollableCondition.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -110,11 +112,17 @@ endif # amqp_0_10/Map.cpp \ # amqp_0_10/handlers.cpp +TESTLIBFLAGS = -module -rpath $(abs_builddir) check_LTLIBRARIES += libshlibtest.la -libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir) +libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS) libshlibtest_la_SOURCES = shlibtest.cpp +check_LTLIBRARIES += test_store.la +test_store_la_SOURCES = test_store.cpp +test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required? +test_store_la_LDFLAGS = $(TESTLIBFLAGS) + include cluster.mk if SSL include ssl.mk @@ -236,24 +244,6 @@ libdlclose_noop_la_SOURCES = dlclose_noop.c CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) -# FIXME aconway 2008-05-23: Disabled interop_runner because it uses -# the obsolete Channel class. Convert to Session and re-enable. -# -# check_PROGRAMS += interop_runner - -# interop_runner_SOURCES = \ -# interop_runner.cpp \ -# SimpleTestCaseBase.cpp \ -# BasicP2PTest.cpp \ -# BasicPubSubTest.cpp \ -# SimpleTestCaseBase.h \ -# BasicP2PTest.h \ -# BasicPubSubTest.h \ -# TestCase.h \ -# TestOptions.h ConnectionOptions.h -# interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs) - - # Longer running stability tests, not run by default check: target. # Not run under valgrind, too slow diff --git a/qpid/cpp/src/tests/PartialFailure.cpp b/qpid/cpp/src/tests/PartialFailure.cpp new file mode 100644 index 0000000000..5137672e7d --- /dev/null +++ b/qpid/cpp/src/tests/PartialFailure.cpp @@ -0,0 +1,219 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/**@file Tests for partial failure in a cluster. + * Partial failure means some nodes experience a failure while others do not. + * In this case the failed nodes must shut down. + */ + +#include "test_tools.h" +#include "unit_test.h" +#include "ClusterFixture.h" +#include <boost/assign.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/bind.hpp> + +QPID_AUTO_TEST_SUITE(PartialFailureTestSuite) + + using namespace std; +using namespace qpid; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace qpid::client; +using namespace qpid::client::arg; +using namespace boost::assign; +using broker::Broker; +using boost::shared_ptr; + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=sys::TIME_SEC/4; + +static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); } + +void updateArgs(ClusterFixture::Args& args, size_t index) { + ostringstream os; + os << "--test-store-name=s" << index; + args.push_back(os.str()); + args.push_back("--load-module=.libs/test_store.so"); + args.push_back("--auth=no"); + args.push_back("TMP_DATA_DIR"); + + // These tests generate errors deliberately, disable error logging unless a log env var is set. + if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) { + remove_if(args.begin(), args.end(), isLogOption); + args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs. + } +} + +Message pMessage(string data, string q) { + Message msg(data, q); + msg.getDeliveryProperties().setDeliveryMode(PERSISTENT); + return msg; +} + +void queueAndSub(Client& c) { + c.session.queueDeclare(c.name, durable=true); + c.subs.subscribe(c.lq, c.name); +} + +// Verify normal cluster-wide errors. +QPID_AUTO_TEST_CASE(testNormalErrors) { + // FIXME aconway 2009-04-10: Would like to put a scope just around + // the statements expected to fail (in BOOST_CHECK_THROW) but that + // sproadically lets out messages, possibly because they're in + // Connection thread. + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(3, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + + queueAndSub(c0); + c0.session.messageTransfer(content=Message("x", "c0")); + BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x"); + + // Session error. + BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException); + c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead. + + // Connection error, kill c1 on all members. + queueAndSub(c1); + BOOST_CHECK_THROW( + c1.session.messageTransfer( + content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")), + ConnectionException); + c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead. + + BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size()); + BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay"); + BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay"); +} + + +// Test errors after a new member joins to verify frame-sequence-numbers are ok in update. +QPID_AUTO_TEST_CASE(testErrorAfterJoin) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(1, -1, updateArgs); + Client c0(cluster[0]); + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Kill the new guy + cluster.add(); + Client c1(cluster[1]); + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q")); + BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); + + // Kill the old guy + cluster.add(); + Client c2(cluster[2]); + c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q")); + BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size()); +} + +// Test that if one member fails and others do not, the failure leaves the cluster. +QPID_AUTO_TEST_CASE(testSinglePartialFailure) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(3, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + // Cause partial failure on c1 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q")); + BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("b", "q")); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); + + // Cause partial failure on c2 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q")); + BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("c", "q")); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); +} + +// Test multiple partial falures: 2 fail 2 pass +QPID_AUTO_TEST_CASE(testMultiPartialFailure) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(4, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + Client c3(cluster[3], "c3"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Cause partial failure on c1, c2 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q")); + BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); + BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("b", "q")); + c3.session.messageTransfer(content=pMessage("c", "q")); + BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); +} + +/** FIXME aconway 2009-04-10: + * The current approach to shutting down a process in test_store + * sometimes leads to assertion failures and errors in the shut-down + * process. Need a cleaner solution + */ +#if 0 +QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(2, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Cause failure on member 0 and simultaneous crash on member 1. + BOOST_CHECK_THROW( + c0.session.messageTransfer( + content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")), + ConnectionException); + cluster.wait(1); + + Client c00(cluster[0], "c00"); // Old connection is dead. + BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size()); +} +#endif + + +QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/PollableCondition.cpp b/qpid/cpp/src/tests/PollableCondition.cpp new file mode 100644 index 0000000000..33664d43fc --- /dev/null +++ b/qpid/cpp/src/tests/PollableCondition.cpp @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "test_tools.h" +#include "unit_test.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" +#include <boost/bind.hpp> + + +QPID_AUTO_TEST_SUITE(PollableConditionTest) + +using namespace qpid::sys; + +const Duration SHORT = TIME_SEC/100; +const Duration LONG = TIME_SEC/10; + +class Callback { + public: + enum Action { NONE, DISARM, CLEAR, DISARM_CLEAR }; + + Callback() : count(), action(NONE) {} + + void call(PollableCondition& pc) { + Mutex::ScopedLock l(lock); + ++count; + switch(action) { + case NONE: break; + case DISARM: pc.disarm(); break; + case CLEAR: pc.clear(); break; + case DISARM_CLEAR: pc.disarm(); pc.clear(); break; + } + action = NONE; + lock.notify(); + } + + bool isCalling() { Mutex::ScopedLock l(lock); return wait(LONG); } + + bool isNotCalling() { Mutex::ScopedLock l(lock); return !wait(SHORT); } + + bool nextCall(Action a=NONE) { + Mutex::ScopedLock l(lock); + action = a; + return wait(LONG); + } + + private: + bool wait(Duration timeout) { + int n = count; + AbsTime deadline(now(), timeout); + while (n == count && lock.wait(deadline)) + ; + return n != count; + } + + Monitor lock; + int count; + Action action; +}; + +QPID_AUTO_TEST_CASE(testPollableCondition) { + boost::shared_ptr<Poller> poller(new Poller()); + Callback callback; + PollableCondition pc(boost::bind(&Callback::call, &callback, _1), poller); + + Thread runner = Thread(*poller); + + BOOST_CHECK(callback.isNotCalling()); // condition is not set or armed. + + pc.rearm(); + BOOST_CHECK(callback.isNotCalling()); // Armed but not set + + pc.set(); + BOOST_CHECK(callback.isCalling()); // Armed and set. + BOOST_CHECK(callback.isCalling()); // Still armed and set. + + callback.nextCall(Callback::DISARM); + BOOST_CHECK(callback.isNotCalling()); // set but not armed + + pc.rearm(); + BOOST_CHECK(callback.isCalling()); // Armed and set. + callback.nextCall(Callback::CLEAR); + BOOST_CHECK(callback.isNotCalling()); // armed but not set + + pc.set(); + BOOST_CHECK(callback.isCalling()); // Armed and set. + callback.nextCall(Callback::DISARM_CLEAR); + BOOST_CHECK(callback.isNotCalling()); // not armed or set. + + poller->shutdown(); + runner.join(); +} + +QPID_AUTO_TEST_SUITE_END() + + diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp b/qpid/cpp/src/tests/SimpleTestCaseBase.cpp deleted file mode 100644 index 2739734731..0000000000 --- a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "SimpleTestCaseBase.h" - -using namespace qpid; - -void SimpleTestCaseBase::start() -{ - if (worker.get()) { - worker->start(); - } -} - -void SimpleTestCaseBase::stop() -{ - if (worker.get()) { - worker->stop(); - } -} - -void SimpleTestCaseBase::report(client::Message& report) -{ - if (worker.get()) { - report.getHeaders().setInt("MESSAGE_COUNT", worker->getCount()); - //add number of messages sent or received - std::stringstream reportstr; - reportstr << worker->getCount(); - report.setData(reportstr.str()); - } -} - -SimpleTestCaseBase::Sender::Sender(ConnectionOptions& options, - const Exchange& _exchange, - const std::string& _key, - const int _messages) - : Worker(options, _messages), exchange(_exchange), key(_key) {} - -void SimpleTestCaseBase::Sender::init() -{ - channel.start(); -} - -void SimpleTestCaseBase::Sender::start(){ - Message msg; - while (count < messages) { - channel.publish(msg, exchange, key); - count++; - } - stop(); -} - -SimpleTestCaseBase::Worker::Worker(ConnectionOptions& options, const int _messages) : - messages(_messages), count(0) -{ - connection.open(options.host, options.port); - connection.openChannel(channel); -} - -void SimpleTestCaseBase::Worker::stop() -{ - channel.close(); - connection.close(); -} - -int SimpleTestCaseBase::Worker::getCount() -{ - return count; -} - diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.h b/qpid/cpp/src/tests/SimpleTestCaseBase.h deleted file mode 100644 index 0c1052d0c2..0000000000 --- a/qpid/cpp/src/tests/SimpleTestCaseBase.h +++ /dev/null @@ -1,89 +0,0 @@ -#ifndef _SimpleTestCaseBase_ -#define _SimpleTestCaseBase_ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <memory> -#include <sstream> - -#include "qpid/Exception.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Message.h" -#include "qpid/client/Connection.h" -#include "ConnectionOptions.h" -#include "qpid/client/MessageListener.h" -#include "TestCase.h" - - -namespace qpid { - -using namespace qpid::client; - -class SimpleTestCaseBase : public TestCase -{ -protected: - class Worker - { - protected: - client::Connection connection; - client::Channel channel; - const int messages; - int count; - - public: - - Worker(ConnectionOptions& options, const int messages); - virtual ~Worker(){} - - virtual void stop(); - virtual int getCount(); - virtual void init() = 0; - virtual void start() = 0; - }; - - class Sender : public Worker - { - const Exchange& exchange; - const std::string key; - public: - Sender(ConnectionOptions& options, - const Exchange& exchange, - const std::string& key, - const int messages); - void init(); - void start(); - }; - - std::auto_ptr<Worker> worker; - -public: - virtual void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options) = 0; - - virtual ~SimpleTestCaseBase() {} - - void start(); - void stop(); - void report(client::Message& report); -}; - -} - -#endif diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index d2a93c902b..ccce3c8842 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -21,45 +21,58 @@ * */ +#include "qpid/sys/IOHandle.h" +#ifdef _WIN32 +# include "qpid/sys/windows/IoHandlePrivate.h" + typedef SOCKET FdType; +#else +# include "qpid/sys/posix/PrivatePosix.h" + typedef int FdType; +#endif #include "qpid/sys/Socket.h" -#include "qpid/sys/Poller.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Mutex.h" -#include "qpid/client/Connection.h" #include "qpid/log/Statement.h" -#include <algorithm> - /** * A simple socket proxy that forwards to another socket. * Used between client & local broker to simulate network failures. */ class SocketProxy : private qpid::sys::Runnable { + // Need a Socket we can get the fd from + class LowSocket : public qpid::sys::Socket { + public: + FdType getFd() { return toFd(impl); } + }; + public: /** Connect to connectPort on host, start a forwarding thread. * Listen for connection on getPort(). */ SocketProxy(int connectPort, const std::string host="localhost") - : closed(false), port(listener.listen()), dropClient(), dropServer() + : closed(false), joined(true), + port(listener.listen()), dropClient(), dropServer() { client.connect(host, connectPort); + joined = false; thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); } - ~SocketProxy() { close(); } + ~SocketProxy() { close(); if (!joined) thread.join(); } /** Simulate a network disconnect. */ void close() { { qpid::sys::Mutex::ScopedLock l(lock); - if (closed) return; + if (closed) { return; } closed=true; } - poller.shutdown(); - if (thread.id() != qpid::sys::Thread::current().id()) - thread.join(); + if (thread.id() != qpid::sys::Thread::current().id()) { + thread.join(); + joined = true; + } client.close(); } @@ -85,56 +98,72 @@ class SocketProxy : private qpid::sys::Runnable } void run() { - std::auto_ptr<qpid::sys::Socket> server; + std::auto_ptr<LowSocket> server; try { - qpid::sys::PollerHandle listenerHandle(listener); - poller.addFd(listenerHandle, qpid::sys::Poller::INPUT); - qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); - throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); - - poller.delFd(listenerHandle); - server.reset(listener.accept()); - - // Pump data between client & server sockets - qpid::sys::PollerHandle clientHandle(client); - qpid::sys::PollerHandle serverHandle(*server); - poller.addFd(clientHandle, qpid::sys::Poller::INPUT); - poller.addFd(serverHandle, qpid::sys::Poller::INPUT); + fd_set socks; + FdType maxFd = listener.getFd(); + struct timeval tmo; + for (;;) { + FD_ZERO(&socks); + FD_SET(maxFd, &socks); + tmo.tv_sec = 0; + tmo.tv_usec = 500 * 1000; + if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { + qpid::sys::Mutex::ScopedLock l(lock); + throwIf(closed, "SocketProxy: Closed by close()"); + continue; + } + throwIf(!FD_ISSET(maxFd, &socks), "SocketProxy: Accept failed"); + break; // Accept ready... go to next step + } + server.reset(reinterpret_cast<LowSocket *>(listener.accept())); + maxFd = server->getFd(); + if (client.getFd() > maxFd) + maxFd = client.getFd(); char buffer[1024]; for (;;) { - qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); - throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected"); - if (event.handle == &serverHandle) { + FD_ZERO(&socks); + tmo.tv_sec = 0; + tmo.tv_usec = 500 * 1000; + FD_SET(client.getFd(), &socks); + FD_SET(server->getFd(), &socks); + if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { + qpid::sys::Mutex::ScopedLock l(lock); + throwIf(closed, "SocketProxy: Closed by close()"); + continue; + } + // Something is set; relay data as needed until something closes + if (FD_ISSET(server->getFd(), &socks)) { ssize_t n = server->read(buffer, sizeof(buffer)); + throwIf(n <= 0, "SocketProxy: server disconnected"); if (!dropServer) client.write(buffer, n); - poller.rearmFd(serverHandle); - } else if (event.handle == &clientHandle) { + } + if (FD_ISSET(client.getFd(), &socks)) { ssize_t n = client.read(buffer, sizeof(buffer)); - if (!dropClient) server->write(buffer, n); - poller.rearmFd(clientHandle); - } else { - throwIf(true, "SocketProxy: No handle ready"); + throwIf(n <= 0, "SocketProxy: client disconnected"); + if (!dropServer) server->write(buffer, n); } + if (!FD_ISSET(client.getFd(), &socks) && + !FD_ISSET(server->getFd(), &socks)) + throwIf(true, "SocketProxy: No handle ready"); } } catch (const std::exception& e) { QPID_LOG(debug, "SocketProxy::run exception: " << e.what()); } try { - if (server.get()) server->close(); - close(); - } + if (server.get()) server->close(); + close(); + } catch (const std::exception& e) { QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what()); } } mutable qpid::sys::Mutex lock; - bool closed; - qpid::sys::Poller poller; - qpid::sys::Socket client, listener; + mutable bool closed; + bool joined; + LowSocket client, listener; uint16_t port; qpid::sys::Thread thread; bool dropClient, dropServer; diff --git a/qpid/cpp/src/tests/TestCase.h b/qpid/cpp/src/tests/TestCase.h deleted file mode 100644 index ba3330c951..0000000000 --- a/qpid/cpp/src/tests/TestCase.h +++ /dev/null @@ -1,64 +0,0 @@ -#ifndef _TestCase_ -#define _TestCase_ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ConnectionOptions.h" -#include "qpid/client/Message.h" - - -namespace qpid { - -/** - * Interface to be implemented by test cases for use with the test - * runner. - */ -class TestCase -{ -public: - /** - * Directs the test case to act in a particular role. Some roles - * may be 'activated' at this stage others may require an explicit - * start request. - */ - virtual void assign(const std::string& role, framing::FieldTable& params, client::ConnectionOptions& options) = 0; - /** - * Each test will be started on its own thread, which should block - * until the test completes (this may or may not require an - * explicit stop() request). - */ - virtual void start() = 0; - /** - * Requests that the test be stopped if still running. - */ - virtual void stop() = 0; - /** - * Allows the test to fill in details on the final report - * message. Will be called only after start has returned. - */ - virtual void report(client::Message& report) = 0; - - virtual ~TestCase() {} -}; - -} - -#endif diff --git a/qpid/cpp/src/tests/XmlClientSessionTest.cpp b/qpid/cpp/src/tests/XmlClientSessionTest.cpp index 98558f0a76..aeb13c292f 100644 --- a/qpid/cpp/src/tests/XmlClientSessionTest.cpp +++ b/qpid/cpp/src/tests/XmlClientSessionTest.cpp @@ -26,7 +26,7 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" -#include "qpid/framing/TransferContent.h" +#include "qpid/client/Message.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/client/Connection.h" #include "qpid/client/SubscriptionManager.h" diff --git a/qpid/cpp/src/tests/client_test.cpp b/qpid/cpp/src/tests/client_test.cpp index 204c2c4b71..05b42f620c 100644 --- a/qpid/cpp/src/tests/client_test.cpp +++ b/qpid/cpp/src/tests/client_test.cpp @@ -32,8 +32,8 @@ #include "qpid/client/Connection.h" #include "qpid/client/Message.h" #include "qpid/client/Session.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/all_method_bodies.h" +#include "qpid/client/SubscriptionManager.h" + using namespace qpid; using namespace qpid::client; @@ -113,35 +113,18 @@ int main(int argc, char** argv) session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1); if (opts.verbose) print("Published message: ", msgOut); - //subscribe to the queue, add sufficient credit and then get - //incoming 'frameset', check that its a message transfer and - //then convert it to a message (see Dispatcher and - //SubscriptionManager utilties for common reusable patterns at - //a higher level) - session.messageSubscribe(arg::queue="MyQueue", arg::destination="MyId"); - session.messageFlow(arg::destination="MyId", arg::unit=0, arg::value=1); //credit for one message - session.messageFlow(arg::destination="MyId", arg::unit=1, arg::value=0xFFFFFFFF); //credit for infinite bytes - if (opts.verbose) std::cout << "Subscribed to queue." << std::endl; - FrameSet::shared_ptr incoming = session.get(); - if (incoming->isA<MessageTransferBody>()) { - Message msgIn(*incoming); - if (msgIn.getData() == msgOut.getData()) { - if (opts.verbose) std::cout << "Received the exepected message." << std::endl; - session.messageAccept(SequenceSet(msgIn.getId())); - session.markCompleted(msgIn.getId(), true, true); - } else { - print("Received an unexepected message: ", msgIn); - } - } else { - throw Exception("Unexpected command received"); - } - + // Using the SubscriptionManager, get the message from the queue. + SubscriptionManager subs(session); + Message msgIn = subs.get("MyQueue"); + if (msgIn.getData() == msgOut.getData()) + if (opts.verbose) std::cout << "Received the exepected message." << std::endl; + //close the session & connection session.close(); if (opts.verbose) std::cout << "Closed session." << std::endl; connection.close(); if (opts.verbose) std::cout << "Closed connection." << std::endl; - return 0; + return 0; } catch(const std::exception& e) { std::cout << e.what() << std::endl; } diff --git a/qpid/cpp/src/tests/client_test.vcproj b/qpid/cpp/src/tests/client_test.vcproj index 1623c2961e..32c90bce04 100644 --- a/qpid/cpp/src/tests/client_test.vcproj +++ b/qpid/cpp/src/tests/client_test.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="client_test"
- ProjectGUID="{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="client_test"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 5d115de5a5..e5e803003a 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -34,8 +34,10 @@ EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_ federated_cluster_test clustered_replication_test check_PROGRAMS+=cluster_test -cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp -cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework +cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \ + cluster_test.cpp PartialFailure.cpp ClusterFailover.cpp + +cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework unit_test_LDADD+=../cluster.la diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index eee2df58cc..d38d84025b 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -35,6 +35,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/enum.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Logger.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" @@ -73,7 +74,7 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4; ostream& operator<<(ostream& o, const cpg_name* n) { - return o << cluster::Cpg::str(*n); + return o << Cpg::str(*n); } ostream& operator<<(ostream& o, const cpg_address& a) { @@ -89,29 +90,12 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -template <class C> set<uint16_t> makeSet(const C& c) { - set<uint16_t> s; +template <class C> set<int> makeSet(const C& c) { + set<int> s; copy(c.begin(), c.end(), inserter(s, s.begin())); return s; } -template <class T> set<uint16_t> knownBrokerPorts(T& source, int n=-1) { - vector<Url> urls = source.getKnownBrokers(); - if (n >= 0 && unsigned(n) != urls.size()) { - BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); - // Retry up to 10 secs in .1 second intervals. - for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { - sys::usleep(1000*100); // 0.1 secs - urls = source.getKnownBrokers(); - } - } - BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls); - set<uint16_t> s; - for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) - s.insert((*i)[0].get<TcpAddress>()->port); - return s; -} - class Sender { public: Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {} @@ -175,7 +159,6 @@ ConnectionSettings aclSettings(int port, const std::string& id) { QPID_AUTO_TEST_CASE(testAcl) { ofstream policyFile("cluster_test.acl"); - // FIXME aconway 2009-02-12: guest -> qpidd? policyFile << "acl allow foo@QPID create queue name=foo" << endl << "acl allow foo@QPID create queue name=foo2" << endl << "acl deny foo@QPID create queue name=bar" << endl @@ -446,13 +429,13 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); - set<uint16_t> kb0 = knownBrokerPorts(c0.connection); + set<int> kb0 = knownBrokerPorts(c0.connection); BOOST_CHECK_EQUAL(kb0.size(), 1u); BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); cluster.add(); Client c1(cluster[1], "c1"); - set<uint16_t> kb1 = knownBrokerPorts(c1.connection); + set<int> kb1 = knownBrokerPorts(c1.connection); kb0 = knownBrokerPorts(c0.connection, 2); BOOST_CHECK_EQUAL(kb1.size(), 2u); BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); @@ -460,7 +443,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { cluster.add(); Client c2(cluster[2], "c2"); - set<uint16_t> kb2 = knownBrokerPorts(c2.connection); + set<int> kb2 = knownBrokerPorts(c2.connection); kb1 = knownBrokerPorts(c1.connection, 3); kb0 = knownBrokerPorts(c0.connection, 3); BOOST_CHECK_EQUAL(kb2.size(), 3u); diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test index 2a3e742632..7afda87733 100755 --- a/qpid/cpp/src/tests/clustered_replication_test +++ b/qpid/cpp/src/tests/clustered_replication_test @@ -23,6 +23,7 @@ # failures: srcdir=`dirname $0` PYTHON_DIR=$srcdir/../../../python +export PYTHONPATH=$PYTHON_DIR trap stop_brokers INT EXIT diff --git a/qpid/cpp/src/tests/consume.vcproj b/qpid/cpp/src/tests/consume.vcproj index 2e1f8e3e56..1fb72e2fb3 100644 --- a/qpid/cpp/src/tests/consume.vcproj +++ b/qpid/cpp/src/tests/consume.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="consume"
- ProjectGUID="{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="consume"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/declare_queues.cpp b/qpid/cpp/src/tests/declare_queues.cpp index 7f61bde12a..d17a72b940 100644 --- a/qpid/cpp/src/tests/declare_queues.cpp +++ b/qpid/cpp/src/tests/declare_queues.cpp @@ -33,14 +33,15 @@ using namespace std; int main(int argc, char ** argv) { ConnectionSettings settings; - if ( argc != 3 ) + if ( argc != 4 ) { - cerr << "Usage: declare_queues host port\n"; + cerr << "Usage: declare_queues host port durability\n"; return 1; } settings.host = argv[1]; settings.port = atoi(argv[2]); + int durability = atoi(argv[3]); FailoverManager connection(settings); try { @@ -48,7 +49,10 @@ int main(int argc, char ** argv) while (!complete) { Session session = connection.connect().newSession(); try { - session.queueDeclare(arg::queue="message_queue"); + if ( durability ) + session.queueDeclare(arg::queue="message_queue", arg::durable=true); + else + session.queueDeclare(arg::queue="message_queue"); complete = true; } catch (const qpid::TransportFailure&) {} } diff --git a/qpid/cpp/src/tests/echotest.vcproj b/qpid/cpp/src/tests/echotest.vcproj index 2848894228..d41a0df4e6 100644 --- a/qpid/cpp/src/tests/echotest.vcproj +++ b/qpid/cpp/src/tests/echotest.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="echotest"
- ProjectGUID="{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="echotest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp index 4f16e469b8..c8f67aadd8 100644 --- a/qpid/cpp/src/tests/failover_soak.cpp +++ b/qpid/cpp/src/tests/failover_soak.cpp @@ -220,63 +220,13 @@ struct children : public vector<child *> cout << "\n\n\n\n"; } - - /* - Only call this if you already know there is at least - one child still running. Supply a time in seconds. - If it has been at least that long since a shild stopped - running, we judge the system to have hung. - */ - int - hanging ( int hangTime ) - { - struct timeval now, - duration; - gettimeofday ( &now, 0 ); - - int how_many_hanging = 0; - - vector<child *>::iterator i; - for ( i = begin(); i != end(); ++ i ) - { - //Not in POSIX - //timersub ( & now, &((*i)->startTime), & duration ); - duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec; - duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec; - if (duration.tv_usec < 0) { - --duration.tv_sec; - duration.tv_usec += 1000000; - } - - if ( (COMPLETED != (*i)->status) // child isn't done running - && - ( duration.tv_sec >= hangTime ) // it's been too long - ) - { - std::cerr << "Child of type " - << (*i)->type - << " hanging. " - << "PID is " - << (*i)->pid - << endl; - ++ how_many_hanging; - } - } - - return how_many_hanging; - } - - int verbosity; }; - children allMyChildren; - - void childExit ( int ) { @@ -361,36 +311,45 @@ wait_for_newbie ( ) } } - +bool endsWith(const char* str, const char* suffix) { + return (strlen(suffix) < strlen(str) && 0 == strcmp(str+strlen(str)-strlen(suffix), suffix)); +} void startNewBroker ( brokerVector & brokers, - char const * srcRoot, - char const * moduleDir, + char const * moduleOrDir, string const clusterName, - int verbosity ) + int verbosity, + int durable ) { static int brokerId = 0; - stringstream path, prefix, module; - module << moduleDir << "/cluster.so"; - path << srcRoot << "/qpidd"; + stringstream path, prefix; prefix << "soak-" << brokerId; std::vector<std::string> argv = list_of<string> ("qpidd") - ("--no-module-dir") - ("--load-module=cluster.so") - ("--cluster-name") - (clusterName) + ("--cluster-name")(clusterName) ("--auth=no") - ("--no-data-dir") ("--mgmt-enable=no") - ("--log-prefix") - (prefix.str()) - ("--log-to-file") - (prefix.str()+".log"); + ("--log-prefix")(prefix.str()) + ("--log-to-file")(prefix.str()+".log") + ("--log-enable=error+") + ("TMP_DATA_DIR"); + + if (endsWith(moduleOrDir, "cluster.so")) { + // Module path specified, load only that module. + argv.push_back(string("--load-module=")+moduleOrDir); + argv.push_back("--no-module-dir"); + if ( durable ) { + std::cerr << "failover_soak warning: durable arg hass no effect. Use \"dir\" option of \"moduleOrDir\".\n"; + } + } + else { + // Module directory specified, load all modules in dir. + argv.push_back(string("--module-dir=")+moduleOrDir); + } - newbie = new ForkedBroker ( argv ); + newbie = new ForkedBroker (argv); newbie_port = newbie->getPort(); ForkedBroker * broker = newbie; @@ -473,7 +432,8 @@ pid_t runDeclareQueuesClient ( brokerVector brokers, char const * host, char const * path, - int verbosity + int verbosity, + int durable ) { string name("declareQueues"); @@ -492,6 +452,10 @@ runDeclareQueuesClient ( brokerVector brokers, argv.push_back ( "declareQueues" ); argv.push_back ( host ); argv.push_back ( portSs.str().c_str() ); + if ( durable ) + argv.push_back ( "1" ); + else + argv.push_back ( "0" ); argv.push_back ( 0 ); pid_t pid = fork(); @@ -562,7 +526,8 @@ startSendingClient ( brokerVector brokers, char const * senderPath, char const * nMessages, char const * reportFrequency, - int verbosity + int verbosity, + int durability ) { string name("sender"); @@ -586,6 +551,10 @@ startSendingClient ( brokerVector brokers, argv.push_back ( nMessages ); argv.push_back ( reportFrequency ); argv.push_back ( verbosityStr ); + if ( durability ) + argv.push_back ( "1" ); + else + argv.push_back ( "0" ); argv.push_back ( 0 ); pid_t pid = fork(); @@ -613,27 +582,33 @@ startSendingClient ( brokerVector brokers, #define ERROR_KILLING_BROKER 8 +// If you want durability, use the "dir" option of "moduleOrDir" . + + int main ( int argc, char const ** argv ) { - if ( argc < 9 ) { - cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath receiverPath nMessages verbosity\n"; - cerr << " ( argc was " << argc << " )\n"; + if ( argc != 9 ) { + cerr << "Usage: " + << argv[0] + << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable" + << endl; + cerr << "\tverbosity is an integer, durable is 0 or 1\n"; return BAD_ARGS; } - signal ( SIGCHLD, childExit ); - char const * srcRoot = argv[1]; - char const * moduleDir = argv[2]; - char const * host = argv[3]; - char const * declareQueuesPath = argv[4]; - char const * senderPath = argv[5]; - char const * receiverPath = argv[6]; - char const * nMessages = argv[7]; - char const * reportFrequency = argv[8]; - int verbosity = atoi(argv[9]); - + int i = 1; + char const * moduleOrDir = argv[i++]; + char const * declareQueuesPath = argv[i++]; + char const * senderPath = argv[i++]; + char const * receiverPath = argv[i++]; + char const * nMessages = argv[i++]; + char const * reportFrequency = argv[i++]; + int verbosity = atoi(argv[i++]); + int durable = atoi(argv[i++]); + + char const * host = "127.0.0.1"; int maxBrokers = 50; allMyChildren.verbosity = verbosity; @@ -652,10 +627,10 @@ main ( int argc, char const ** argv ) int nBrokers = 3; for ( int i = 0; i < nBrokers; ++ i ) { startNewBroker ( brokers, - srcRoot, - moduleDir, + moduleOrDir, clusterName, - verbosity ); + verbosity, + durable ); } @@ -665,7 +640,7 @@ main ( int argc, char const ** argv ) // Run the declareQueues child. int childStatus; pid_t dqClientPid = - runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity ); + runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity, durable ); if ( -1 == dqClientPid ) { cerr << "END_OF_TEST ERROR_START_DECLARE_1\n"; return CANT_FORK_DQ; @@ -701,7 +676,8 @@ main ( int argc, char const ** argv ) senderPath, nMessages, reportFrequency, - verbosity ); + verbosity, + durable ); if ( -1 == sendingClientPid ) { cerr << "END_OF_TEST ERROR_START_SENDER\n"; return CANT_FORK_SENDER; @@ -745,10 +721,10 @@ main ( int argc, char const ** argv ) cout << "Starting new broker.\n\n"; startNewBroker ( brokers, - srcRoot, - moduleDir, + moduleOrDir, clusterName, - verbosity ); + verbosity, + durable ); if ( verbosity > 1 ) printBrokers ( brokers ); @@ -787,16 +763,6 @@ main ( int argc, char const ** argv ) return ERROR_ON_CHILD; } - // If one is hanging, quit. - if ( allMyChildren.hanging ( 120 ) ) - { - /* - * Don't kill any processes. Leave alive for questioning. - * */ - std::cerr << "END_OF_TEST ERROR_HANGING\n"; - return HANGING; - } - if ( verbosity > 1 ) { std::cerr << "------- next kill-broker loop --------\n"; allMyChildren.print(); diff --git a/qpid/cpp/src/tests/header_test.vcproj b/qpid/cpp/src/tests/header_test.vcproj index da761a6edb..cdfc984f46 100644 --- a/qpid/cpp/src/tests/header_test.vcproj +++ b/qpid/cpp/src/tests/header_test.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="header_test"
- ProjectGUID="{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="header_test"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/interop_runner.cpp b/qpid/cpp/src/tests/interop_runner.cpp deleted file mode 100644 index 8c6e0a6991..0000000000 --- a/qpid/cpp/src/tests/interop_runner.cpp +++ /dev/null @@ -1,251 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/Options.h" -#include "qpid/ptr_map.h" -#include "qpid/Exception.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Connection.h" -#include "qpid/client/ConnectionOptions.h" -#include "qpid/client/Exchange.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/Queue.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Time.h" -#include <iostream> -#include <memory> -#include "BasicP2PTest.h" -#include "BasicPubSubTest.h" -#include "TestCase.h" -#include <boost/ptr_container/ptr_map.hpp> - -/** - * Framework for interop tests. - * - * [see http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification for details]. - */ - -using namespace qpid::client; -using namespace qpid::sys; -using qpid::TestCase; -using qpid::framing::FieldTable; -using qpid::framing::ReplyTo; -using namespace std; - -class DummyRun : public TestCase -{ -public: - DummyRun() {} - void assign(const string&, FieldTable&, ConnectionOptions&) {} - void start() {} - void stop() {} - void report(qpid::client::Message&) {} -}; - -string parse_next_word(const string& input, const string& delims, string::size_type& position); - -/** - */ -class Listener : public MessageListener, private Runnable{ - typedef boost::ptr_map<string, TestCase> TestMap; - - Channel& channel; - ConnectionOptions& options; - TestMap tests; - const string name; - const string topic; - TestCase* test; - auto_ptr<Thread> runner; - ReplyTo reportTo; - string reportCorrelator; - - void shutdown(); - bool invite(const string& name); - void run(); - - void sendResponse(Message& response, ReplyTo replyTo); - void sendResponse(Message& response, Message& request); - void sendSimpleResponse(const string& type, Message& request); - void sendReport(); -public: - Listener(Channel& channel, ConnectionOptions& options); - void received(Message& msg); - void bindAndConsume(); - void registerTest(string name, TestCase* test); -}; - -struct TestSettings : ConnectionOptions -{ - bool help; - - TestSettings() : help(false) - { - addOptions() - ("help", qpid::optValue(help), "print this usage statement"); - } -}; - -int main(int argc, char** argv) { - try { - TestSettings options; - options.parse(argc, argv); - if (options.help) { - cout << options; - } else { - Connection connection; - connection.open(options.host, options.port, "guest", "guest", options.virtualhost); - - Channel channel; - connection.openChannel(channel); - - Listener listener(channel, options); - listener.registerTest("TC1_DummyRun", new DummyRun()); - listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest()); - listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest()); - - listener.bindAndConsume(); - - channel.run(); - connection.close(); - } - } catch(const exception& error) { - cout << error.what() << endl << "Type " << argv[0] << " --help for help" << endl; - } -} - -Listener::Listener(Channel& _channel, ConnectionOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name) -{} - -void Listener::registerTest(string name, TestCase* test) -{ - tests.insert(name, test); -} - -void Listener::bindAndConsume() -{ - Queue control(name, true); - channel.declareQueue(control); - qpid::framing::FieldTable bindArgs; - //replace these separate binds with a wildcard once that is supported on java broker - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "iop.control", bindArgs); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, topic, bindArgs); - - string tag; - channel.consume(control, tag, this); -} - -void Listener::sendSimpleResponse(const string& type, Message& request) -{ - Message response; - response.getHeaders().setString("CONTROL_TYPE", type); - response.getHeaders().setString("CLIENT_NAME", name); - response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic); - response.getMessageProperties().setCorrelationId(request.getMessageProperties().getCorrelationId()); - sendResponse(response, request); -} - -void Listener::sendResponse(Message& response, Message& request) -{ - sendResponse(response, request.getMessageProperties().getReplyTo()); -} - -void Listener::sendResponse(Message& response, ReplyTo replyTo) -{ - string exchange = replyTo.getExchange(); - string routingKey = replyTo.getRoutingKey(); - channel.publish(response, exchange, routingKey); -} - -void Listener::received(Message& message) -{ - string type(message.getHeaders().getString("CONTROL_TYPE")); - - if (type == "INVITE") { - string name(message.getHeaders().getString("TEST_NAME")); - if (name.empty() || invite(name)) { - sendSimpleResponse("ENLIST", message); - } else { - cout << "Can't take part in '" << name << "'" << endl; - } - } else if (type == "ASSIGN_ROLE") { - test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options); - sendSimpleResponse("ACCEPT_ROLE", message); - } else if (type == "START") { - reportTo = message.getMessageProperties().getReplyTo(); - reportCorrelator = message.getMessageProperties().getCorrelationId(); - runner = auto_ptr<Thread>(new Thread(this)); - } else if (type == "STATUS_REQUEST") { - reportTo = message.getMessageProperties().getReplyTo(); - reportCorrelator = message.getMessageProperties().getCorrelationId(); - test->stop(); - sendReport(); - } else if (type == "TERMINATE") { - if (test) test->stop(); - shutdown(); - } else { - cerr <<"ERROR!: Received unknown control message: " << type << endl; - shutdown(); - } -} - -void Listener::shutdown() -{ - channel.close(); -} - -bool Listener::invite(const string& name) -{ - TestMap::iterator i = tests.find(name); - test = (i != tests.end()) ? qpid::ptr_map_ptr(i) : 0; - return test; -} - -void Listener::run() -{ - //NB: this method will be called in its own thread - //start test and when start returns... - test->start(); - sendReport(); -} - -void Listener::sendReport() -{ - Message report; - report.getHeaders().setString("CONTROL_TYPE", "REPORT"); - test->report(report); - report.getMessageProperties().setCorrelationId(reportCorrelator); - sendResponse(report, reportTo); -} - -string parse_next_word(const string& input, const string& delims, string::size_type& position) -{ - string::size_type start = input.find_first_not_of(delims, position); - if (start == string::npos) { - return ""; - } else { - string::size_type end = input.find_first_of(delims, start); - if (end == string::npos) { - end = input.length(); - } - position = end; - return input.substr(start, end - start); - } -} diff --git a/qpid/cpp/src/tests/latencytest.vcproj b/qpid/cpp/src/tests/latencytest.vcproj index 758302029a..da48e682bf 100644 --- a/qpid/cpp/src/tests/latencytest.vcproj +++ b/qpid/cpp/src/tests/latencytest.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="latencytest"
- ProjectGUID="{4A809018-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{4A809018-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="latencytest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/perftest.vcproj b/qpid/cpp/src/tests/perftest.vcproj index 3a2397ac9b..83cd550df5 100644 --- a/qpid/cpp/src/tests/perftest.vcproj +++ b/qpid/cpp/src/tests/perftest.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="perftest"
- ProjectGUID="{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="perftest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/publish.vcproj b/qpid/cpp/src/tests/publish.vcproj index 7422606a32..849b845025 100644 --- a/qpid/cpp/src/tests/publish.vcproj +++ b/qpid/cpp/src/tests/publish.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="publish"
- ProjectGUID="{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="publish"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/receiver.vcproj b/qpid/cpp/src/tests/receiver.vcproj index ce6fad366e..8744ac3234 100644 --- a/qpid/cpp/src/tests/receiver.vcproj +++ b/qpid/cpp/src/tests/receiver.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="receiver"
- ProjectGUID="{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{7D314A98-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="receiver"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/replaying_sender.cpp b/qpid/cpp/src/tests/replaying_sender.cpp index ea2a13bd54..3ee69eec14 100644 --- a/qpid/cpp/src/tests/replaying_sender.cpp +++ b/qpid/cpp/src/tests/replaying_sender.cpp @@ -40,9 +40,10 @@ class Sender : public FailoverManager::Command public: Sender(const std::string& queue, uint count, uint reportFreq); void execute(AsyncSession& session, bool isRetry); - uint getSent(); + uint getSent(); - int verbosity; + void setVerbosity ( int v ) { verbosity = v; } + void setPersistence ( int p ) { persistence = p; } private: MessageReplayTracker sender; @@ -51,9 +52,11 @@ class Sender : public FailoverManager::Command const uint reportFrequency; Message message; + int verbosity; + int persistence; }; -Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq) +Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq), verbosity(0), persistence(0) { message.getDeliveryProperties().setRoutingKey(queue); } @@ -69,6 +72,9 @@ void Sender::execute(AsyncSession& session, bool isRetry) message_data << ++sent; message.setData(message_data.str()); message.getHeaders().setInt("sn", sent); + if ( persistence ) + message.getDeliveryProperties().setDeliveryMode(PERSISTENT); + sender.send(message); if (count > reportFrequency && !(sent % reportFrequency)) { if ( verbosity > 0 ) @@ -91,9 +97,9 @@ int main(int argc, char ** argv) { ConnectionSettings settings; - if ( argc != 6 ) + if ( argc != 7 ) { - std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity\n"; + std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence\n"; return 1; } @@ -102,10 +108,12 @@ int main(int argc, char ** argv) int n_messages = atoi(argv[3]); int reportFrequency = atoi(argv[4]); int verbosity = atoi(argv[5]); + int persistence = atoi(argv[6]); FailoverManager connection(settings); Sender sender("message_queue", n_messages, reportFrequency ); - sender.verbosity = verbosity; + sender.setVerbosity ( verbosity ); + sender.setPersistence ( persistence ); try { connection.execute ( sender ); if ( verbosity > 0 ) diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak index 36dfed79a6..3c9a5589c4 100755 --- a/qpid/cpp/src/tests/run_failover_soak +++ b/qpid/cpp/src/tests/run_failover_soak @@ -45,12 +45,12 @@ fi host=127.0.0.1 -src_root=.. -module_dir=$src_root/.libs - +MODULES=${MODULES:-../.libs} MESSAGES=${MESSAGES:-300000} REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`} VERBOSITY=${VERBOSITY:-1} +DURABILITY=${DURABILITY:-1} -exec ./failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY +rm -f soak-*.log +exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY diff --git a/qpid/cpp/src/tests/sender.vcproj b/qpid/cpp/src/tests/sender.vcproj index 616b665406..fe564cb3c6 100644 --- a/qpid/cpp/src/tests/sender.vcproj +++ b/qpid/cpp/src/tests/sender.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="sender"
- ProjectGUID="{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{09714CB8-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="sender"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/shlibtest.vcproj b/qpid/cpp/src/tests/shlibtest.vcproj index 4a139a2cb5..2052230740 100644 --- a/qpid/cpp/src/tests/shlibtest.vcproj +++ b/qpid/cpp/src/tests/shlibtest.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="shlibtest"
- ProjectGUID="{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="shlibtest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster index 053b23da33..585ba082d5 100755 --- a/qpid/cpp/src/tests/start_cluster +++ b/qpid/cpp/src/tests/start_cluster @@ -28,15 +28,17 @@ with_ais_group() { echo $* | newgrp ais } -rm -f cluster*.log -SIZE=${1:-1}; shift +rm -f cluster*.log cluster.ports qpidd.port + +SIZE=${1:-3}; shift CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. -OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@" +OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --auth=no $@" for (( i=0; i<SIZE; ++i )); do - PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1 + DDIR=`mktemp -d /tmp/start_cluster.XXXXXXXXXX` + PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS --data-dir=$DDIR` || exit 1 echo $PORT >> cluster.ports done -head cluster.ports > qpidd.port # First member's port for tests. +head -n 1 cluster.ports > qpidd.port # First member's port for tests. diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp new file mode 100644 index 0000000000..e5c3522852 --- /dev/null +++ b/qpid/cpp/src/tests/test_store.cpp @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/**@file + * Plug-in message store for tests. + * + * Add functionality as required, build up a comprehensive set of + * features to support persistent behavior tests. + * + * Current features special "action" messages can: + * - raise exception from enqueue. + * - force host process to exit. + * - do async completion after a delay. + */ + +#include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include <boost/algorithm/string.hpp> +#include <boost/cast.hpp> +#include <boost/lexical_cast.hpp> + +using namespace qpid; +using namespace broker; +using namespace std; +using namespace boost; +using namespace qpid::sys; + +struct TestStoreOptions : public Options { + + string name; + + TestStoreOptions() : Options("Test Store Options") { + addOptions() + ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance."); + } +}; + +struct Completer : public Runnable { + intrusive_ptr<PersistableMessage> message; + int usecs; + Completer(intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {} + void run() { + qpid::sys::usleep(usecs); + message->enqueueComplete(); + delete this; + } +}; + +class TestStore : public NullMessageStore { + public: + TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {} + + ~TestStore() { + for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); + } + + void enqueue(TransactionContext* , + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& ) + { + string data = polymorphic_downcast<Message*>(msg.get())->getFrames().getContent(); + + // Check the message for special instructions. + size_t i = string::npos; + size_t j = string::npos; + if (starts_with(data, TEST_STORE_DO) + && (i = data.find(name+"[")) != string::npos + && (j = data.find("]", i)) != string::npos) + { + size_t start = i+name.size()+1; + string action = data.substr(start, j-start); + + if (action == EXCEPTION) { + throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); + } + else if (action == EXIT_PROCESS) { + // FIXME aconway 2009-04-10: this is a dubious way to + // close the process at best, it can cause assertions or seg faults + // rather than clean exit. + QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data); + exit(0); + } + else if (starts_with(action, ASYNC)) { + std::string delayStr(action.substr(ASYNC.size())); + int delay = lexical_cast<int>(delayStr); + threads.push_back(Thread(*new Completer(msg, delay))); + } + else { + QPID_LOG(error, "TestStore " << name << " unknown action " << action); + msg->enqueueComplete(); + } + } + else + msg->enqueueComplete(); + } + + private: + static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; + string name; + Broker& broker; + vector<Thread> threads; +}; + +const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; +const string TestStore::EXCEPTION = "exception"; +const string TestStore::EXIT_PROCESS = "exit_process"; +const string TestStore::ASYNC="async "; + +struct TestStorePlugin : public Plugin { + + TestStoreOptions options; + + Options* getOptions() { return &options; } + + void earlyInitialize (Plugin::Target& target) + { + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker) return; + broker->setStore (new TestStore(options.name, *broker)); + } + + void initialize(qpid::Plugin::Target&) {} +}; + +static TestStorePlugin pluginInstance; diff --git a/qpid/cpp/src/tests/tests.sln b/qpid/cpp/src/tests/tests.sln index 273e90d1c8..9e24487820 100644 --- a/qpid/cpp/src/tests/tests.sln +++ b/qpid/cpp/src/tests/tests.sln @@ -8,37 +8,37 @@ Microsoft Visual Studio Solution File, Format Version 10.00 #
# MPC Command:
# C:\ace\MPC\mwc.pl -type vc9 -features boost=1 tests.mwc
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "client_test", "client_test.vcproj", "{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "client_test", "client_test.vcproj", "{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "consume", "consume.vcproj", "{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "consume", "consume.vcproj", "{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "echotest", "echotest.vcproj", "{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "echotest", "echotest.vcproj", "{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "header_test", "header_test.vcproj", "{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "header_test", "header_test.vcproj", "{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "latencytest", "latencytest.vcproj", "{4A809018-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "latencytest", "latencytest.vcproj", "{4A809018-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "perftest", "perftest.vcproj", "{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "perftest", "perftest.vcproj", "{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "publish", "publish.vcproj", "{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "publish", "publish.vcproj", "{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "receiver", "receiver.vcproj", "{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "receiver", "receiver.vcproj", "{7D314A98-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "sender", "sender.vcproj", "{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "sender", "sender.vcproj", "{09714CB8-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "shlibtest", "shlibtest.vcproj", "{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "shlibtest", "shlibtest.vcproj", "{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_listener", "topic_listener.vcproj", "{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_listener", "topic_listener.vcproj", "{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_publisher", "topic_publisher.vcproj", "{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_publisher", "topic_publisher.vcproj", "{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txjob", "txjob.vcproj", "{09034A53-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txjob", "txjob.vcproj", "{09034A53-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txshift", "txshift.vcproj", "{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txshift", "txshift.vcproj", "{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txtest", "txtest.vcproj", "{697786BE-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txtest", "txtest.vcproj", "{697786BE-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "unit_test", "unit_test.vcproj", "{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "unit_test", "unit_test.vcproj", "{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -48,134 +48,134 @@ Global Release|x64 = Release|x64
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/qpid/cpp/src/tests/topic_listener.vcproj b/qpid/cpp/src/tests/topic_listener.vcproj index 0be31b8348..3f338bfb62 100644 --- a/qpid/cpp/src/tests/topic_listener.vcproj +++ b/qpid/cpp/src/tests/topic_listener.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="topic_listener"
- ProjectGUID="{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="topic_listener"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/topic_publisher.vcproj b/qpid/cpp/src/tests/topic_publisher.vcproj index 016c6d4a38..5b0fd21ef4 100644 --- a/qpid/cpp/src/tests/topic_publisher.vcproj +++ b/qpid/cpp/src/tests/topic_publisher.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="topic_publisher"
- ProjectGUID="{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="topic_publisher"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/txjob.vcproj b/qpid/cpp/src/tests/txjob.vcproj index 19fe3fba12..e002d58ae9 100644 --- a/qpid/cpp/src/tests/txjob.vcproj +++ b/qpid/cpp/src/tests/txjob.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="txjob"
- ProjectGUID="{09034A53-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{09034A53-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="txjob"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/txshift.vcproj b/qpid/cpp/src/tests/txshift.vcproj index 3212881351..75ccfc6d4f 100644 --- a/qpid/cpp/src/tests/txshift.vcproj +++ b/qpid/cpp/src/tests/txshift.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="txshift"
- ProjectGUID="{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="txshift"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/txtest.vcproj b/qpid/cpp/src/tests/txtest.vcproj index 663291a9d5..3549ba97aa 100644 --- a/qpid/cpp/src/tests/txtest.vcproj +++ b/qpid/cpp/src/tests/txtest.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="txtest"
- ProjectGUID="{697786BE-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{697786BE-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="txtest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/unit_test.vcproj b/qpid/cpp/src/tests/unit_test.vcproj index 8710b617f8..9653290013 100644 --- a/qpid/cpp/src/tests/unit_test.vcproj +++ b/qpid/cpp/src/tests/unit_test.vcproj @@ -3,7 +3,7 @@ ProjectType="Visual C++"
Version="9.00"
Name="unit_test"
- ProjectGUID="{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="unit_test"
Keyword="Win32Proj"
SignManifests="true"
@@ -76,7 +76,7 @@ />
<Tool
Name="VCLinkerTool"
- AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib"
+ AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib ws2_32.lib"
OutputFile="$(OutDir)\unit_test.exe"
LinkIncremental="2"
SuppressStartupBanner="true"
@@ -155,7 +155,7 @@ />
<Tool
Name="VCLinkerTool"
- AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib"
+ AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib ws2_32.lib"
OutputFile="$(OutDir)\unit_test.exe"
LinkIncremental="1"
SuppressStartupBanner="true"
@@ -240,7 +240,7 @@ <Tool
Name="VCLinkerTool"
AdditionalOptions="/machine:AMD64"
- AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib"
+ AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib ws2_32.lib"
OutputFile="$(OutDir)\unit_test.exe"
LinkIncremental="2"
SuppressStartupBanner="true"
@@ -320,7 +320,7 @@ <Tool
Name="VCLinkerTool"
AdditionalOptions="/machine:AMD64"
- AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib"
+ AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib ws2_32.lib"
OutputFile="$(OutDir)\unit_test.exe"
LinkIncremental="1"
SuppressStartupBanner="true"
|