summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-04-23 21:06:35 +0000
committerStephen D. Huston <shuston@apache.org>2009-04-23 21:06:35 +0000
commita7259adba14345898e78b483b7620340ffa5cfc5 (patch)
treee8d26c0981a666442ad4aa2fff5ddb87c5ce5866
parent8d32b03448e8e1ba6319fc0ac484d0ab54b29b38 (diff)
downloadqpid-python-cmake.tar.gz
Merge in trunk changes from r758432:768028cmake
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/cmake@768053 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/INSTALL3
-rw-r--r--qpid/cpp/rubygen/framing.0-10/Session.rb14
-rw-r--r--qpid/cpp/src/Makefile.am14
-rw-r--r--qpid/cpp/src/cluster.mk2
-rw-r--r--qpid/cpp/src/common.vcproj12
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h17
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/client/Completion.cpp (renamed from qpid/cpp/src/tests/BasicP2PTest.h)32
-rw-r--r--qpid/cpp/src/qpid/client/Completion.h35
-rw-r--r--qpid/cpp/src/qpid/client/CompletionImpl.h (renamed from qpid/cpp/src/tests/BasicPubSubTest.h)43
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp6
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.h2
-rw-r--r--qpid/cpp/src/qpid/client/FailoverListener.cpp3
-rw-r--r--qpid/cpp/src/qpid/client/FailoverListener.h1
-rw-r--r--qpid/cpp/src/qpid/client/Future.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/Future.h14
-rw-r--r--qpid/cpp/src/qpid/client/Handle.h11
-rw-r--r--qpid/cpp/src/qpid/client/HandlePrivate.h12
-rw-r--r--qpid/cpp/src/qpid/client/LocalQueue.cpp5
-rw-r--r--qpid/cpp/src/qpid/client/Message.cpp59
-rw-r--r--qpid/cpp/src/qpid/client/Message.h55
-rw-r--r--qpid/cpp/src/qpid/client/MessageImpl.cpp71
-rw-r--r--qpid/cpp/src/qpid/client/MessageImpl.h76
-rw-r--r--qpid/cpp/src/qpid/client/PrivateImpl.h54
-rw-r--r--qpid/cpp/src/qpid/client/PrivateImplPrivate.h66
-rw-r--r--qpid/cpp/src/qpid/client/SessionBase_0_10.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/SessionBase_0_10.h10
-rw-r--r--qpid/cpp/src/qpid/client/Subscription.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/TypedResult.h10
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp79
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h17
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp43
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.h20
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp53
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h19
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.cpp120
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.h80
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/EventFrame.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/EventFrame.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp7
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/LockedConnectionMap.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp17
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h1
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.h3
-rw-r--r--qpid/cpp/src/qpid/sys/LatencyMetric.cpp74
-rw-r--r--qpid/cpp/src/qpid/sys/LatencyMetric.h85
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h3
-rw-r--r--qpid/cpp/src/tests/AsyncCompletion.cpp13
-rw-r--r--qpid/cpp/src/tests/BasicP2PTest.cpp66
-rw-r--r--qpid/cpp/src/tests/BasicPubSubTest.cpp121
-rw-r--r--qpid/cpp/src/tests/BrokerFixture.h10
-rw-r--r--qpid/cpp/src/tests/ClientMessageTest.cpp46
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp8
-rw-r--r--qpid/cpp/src/tests/ClusterFailover.cpp59
-rw-r--r--qpid/cpp/src/tests/ClusterFixture.cpp44
-rw-r--r--qpid/cpp/src/tests/ClusterFixture.h29
-rw-r--r--qpid/cpp/src/tests/ForkedBroker.cpp49
-rw-r--r--qpid/cpp/src/tests/ForkedBroker.h9
-rw-r--r--qpid/cpp/src/tests/Makefile.am30
-rw-r--r--qpid/cpp/src/tests/PartialFailure.cpp219
-rw-r--r--qpid/cpp/src/tests/PollableCondition.cpp117
-rw-r--r--qpid/cpp/src/tests/SimpleTestCaseBase.cpp87
-rw-r--r--qpid/cpp/src/tests/SimpleTestCaseBase.h89
-rw-r--r--qpid/cpp/src/tests/SocketProxy.h111
-rw-r--r--qpid/cpp/src/tests/TestCase.h64
-rw-r--r--qpid/cpp/src/tests/XmlClientSessionTest.cpp2
-rw-r--r--qpid/cpp/src/tests/client_test.cpp35
-rw-r--r--qpid/cpp/src/tests/client_test.vcproj2
-rw-r--r--qpid/cpp/src/tests/cluster.mk6
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp31
-rwxr-xr-xqpid/cpp/src/tests/clustered_replication_test1
-rw-r--r--qpid/cpp/src/tests/consume.vcproj2
-rw-r--r--qpid/cpp/src/tests/declare_queues.cpp10
-rw-r--r--qpid/cpp/src/tests/echotest.vcproj2
-rw-r--r--qpid/cpp/src/tests/failover_soak.cpp168
-rw-r--r--qpid/cpp/src/tests/header_test.vcproj2
-rw-r--r--qpid/cpp/src/tests/interop_runner.cpp251
-rw-r--r--qpid/cpp/src/tests/latencytest.vcproj2
-rw-r--r--qpid/cpp/src/tests/perftest.vcproj2
-rw-r--r--qpid/cpp/src/tests/publish.vcproj2
-rw-r--r--qpid/cpp/src/tests/receiver.vcproj2
-rw-r--r--qpid/cpp/src/tests/replaying_sender.cpp20
-rwxr-xr-xqpid/cpp/src/tests/run_failover_soak8
-rw-r--r--qpid/cpp/src/tests/sender.vcproj2
-rw-r--r--qpid/cpp/src/tests/shlibtest.vcproj2
-rwxr-xr-xqpid/cpp/src/tests/start_cluster12
-rw-r--r--qpid/cpp/src/tests/test_store.cpp147
-rw-r--r--qpid/cpp/src/tests/tests.sln288
-rw-r--r--qpid/cpp/src/tests/topic_listener.vcproj2
-rw-r--r--qpid/cpp/src/tests/topic_publisher.vcproj2
-rw-r--r--qpid/cpp/src/tests/txjob.vcproj2
-rw-r--r--qpid/cpp/src/tests/txshift.vcproj2
-rw-r--r--qpid/cpp/src/tests/txtest.vcproj2
-rw-r--r--qpid/cpp/src/tests/unit_test.vcproj10
-rw-r--r--qpid/cpp/xml/cluster.xml15
115 files changed, 1998 insertions, 1569 deletions
diff --git a/qpid/cpp/INSTALL b/qpid/cpp/INSTALL
index 31349590c1..e7103d2707 100644
--- a/qpid/cpp/INSTALL
+++ b/qpid/cpp/INSTALL
@@ -104,6 +104,9 @@ On Fedora 10 or later
For SASL and SSL, include
# yum install cyrus-sasl-devel
+For the XML Exchange, include:
+
+ # yum install xqilla-devel xerces-c-devel
Follow the manual installation instruction below for any packages not
available through your distributions packaging tool.
diff --git a/qpid/cpp/rubygen/framing.0-10/Session.rb b/qpid/cpp/rubygen/framing.0-10/Session.rb
index 709491e42e..48a1608af0 100644
--- a/qpid/cpp/rubygen/framing.0-10/Session.rb
+++ b/qpid/cpp/rubygen/framing.0-10/Session.rb
@@ -77,9 +77,9 @@ end
class ContentField # For extra content parameters
def cppname() "content" end
- def signature() "const MethodContent& content" end
- def sig_default() signature+"="+"DefaultContent(std::string())" end
- def unpack() "p[arg::content|DefaultContent(std::string())]"; end
+ def signature() "const Message& content" end
+ def sig_default() signature+"="+"Message(std::string())" end
+ def unpack() "p[arg::content|Message(std::string())]"; end
def doc() "Message content"; end
end
@@ -160,6 +160,10 @@ class SessionNoKeywordGen < CppGen
cpp_file(@file) {
include @classname
include "qpid/framing/all_method_bodies.h"
+ include "qpid/client/SessionImpl.h"
+ include "qpid/client/MessageImpl.h"
+ include "qpid/client/PrivateImplPrivate.h"
+ include "qpid/client/CompletionImpl.h"
namespace(@namespace) {
genl "using namespace framing;"
session_methods(sync_default).each { |m|
@@ -171,8 +175,8 @@ class SessionNoKeywordGen < CppGen
genl "#{m.body_name} body(#{args});";
genl "body.setSync(sync);"
sendargs="body"
- sendargs << ", content" if m.content
- async_retval="#{m.return_type(true)}(impl->send(#{sendargs}), impl)"
+ sendargs << ", *privateImplGetPtr(content)" if m.content
+ async_retval="#{m.return_type(true)}(new CompletionImpl(impl->send(#{sendargs}), impl))"
if @async then
genl "return #{async_retval};"
else
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 3ac1992e2f..457463e72d 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -343,7 +343,6 @@ libqpidcommon_la_SOURCES = \
qpid/sys/AsynchIOHandler.cpp \
qpid/sys/Dispatcher.cpp \
qpid/sys/DispatchHandle.cpp \
- qpid/sys/LatencyMetric.cpp \
qpid/sys/Runnable.cpp \
qpid/sys/Shlib.cpp \
qpid/sys/Timer.cpp
@@ -437,21 +436,26 @@ libqpidclient_la_SOURCES = \
qpid/client/Bounds.cpp \
qpid/client/Connection.cpp \
qpid/client/ConnectionHandler.cpp \
- qpid/client/ConnectionImpl.cpp \
+ qpid/client/ConnectionImpl.cpp \
qpid/client/ConnectionSettings.cpp \
- qpid/client/Connector.cpp \
+ qpid/client/Connector.cpp \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
- qpid/client/FailoverManager.cpp \
+ qpid/client/FailoverManager.cpp \
qpid/client/FailoverListener.h \
qpid/client/FailoverListener.cpp \
qpid/client/Future.cpp \
qpid/client/FutureCompletion.cpp \
+ qpid/client/Completion.cpp \
+ qpid/client/CompletionImpl.h \
qpid/client/FutureResult.cpp \
qpid/client/HandlePrivate.h \
+ qpid/client/PrivateImplPrivate.h \
qpid/client/LoadPlugins.cpp \
qpid/client/LocalQueue.cpp \
qpid/client/Message.cpp \
+ qpid/client/MessageImpl.cpp \
+ qpid/client/MessageImpl.h \
qpid/client/MessageListener.cpp \
qpid/client/MessageReplayTracker.cpp \
qpid/client/QueueOptions.cpp \
@@ -597,6 +601,7 @@ nobase_include_HEADERS = \
qpid/client/FutureCompletion.h \
qpid/client/FutureResult.h \
qpid/client/Handle.h \
+ qpid/client/PrivateImpl.h \
qpid/client/LocalQueue.h \
qpid/client/QueueOptions.h \
qpid/client/Message.h \
@@ -697,7 +702,6 @@ nobase_include_HEADERS = \
qpid/sys/FileSysDir.h \
qpid/sys/IntegerTypes.h \
qpid/sys/IOHandle.h \
- qpid/sys/LatencyMetric.h \
qpid/sys/LockFile.h \
qpid/sys/LockPtr.h \
qpid/sys/Monitor.h \
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index e2054d75e9..fdac229646 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -56,6 +56,8 @@ cluster_la_SOURCES = \
qpid/cluster/Dispatchable.h \
qpid/cluster/UpdateClient.cpp \
qpid/cluster/UpdateClient.h \
+ qpid/cluster/ErrorCheck.cpp \
+ qpid/cluster/ErrorCheck.h \
qpid/cluster/Event.cpp \
qpid/cluster/Event.h \
qpid/cluster/EventFrame.h \
diff --git a/qpid/cpp/src/common.vcproj b/qpid/cpp/src/common.vcproj
index 96d67b9d54..a95e8483e2 100644
--- a/qpid/cpp/src/common.vcproj
+++ b/qpid/cpp/src/common.vcproj
@@ -456,6 +456,9 @@
RelativePath="gen\qpid\framing\ClusterConnectionTxStartBody.cpp">
</File>
<File
+ RelativePath="gen\qpid\framing\ClusterErrorCheckBody.cpp">
+ </File>
+ <File
RelativePath="gen\qpid\framing\ClusterMessageExpiredBody.cpp">
</File>
<File
@@ -972,9 +975,6 @@
RelativePath="qpid\sys\Dispatcher.cpp">
</File>
<File
- RelativePath="qpid\sys\LatencyMetric.cpp">
- </File>
- <File
RelativePath="qpid\sys\Runnable.cpp">
</File>
<File
@@ -1165,6 +1165,9 @@
RelativePath="gen\qpid\framing\ClusterConnectionTxStartBody.h">
</File>
<File
+ RelativePath="gen\qpid\framing\ClusterErrorCheckBody.h">
+ </File>
+ <File
RelativePath="gen\qpid\framing\ClusterMessageExpiredBody.h">
</File>
<File
@@ -1810,9 +1813,6 @@
RelativePath="qpid\sys\IOHandle.h">
</File>
<File
- RelativePath="qpid\sys\LatencyMetric.h">
- </File>
- <File
RelativePath="qpid\sys\LockFile.h">
</File>
<File
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index db957051d8..f927db09bb 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -88,6 +88,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
catch(const SessionException& e) {
QPID_LOG(error, "Execution exception: " << e.what());
+ executionException(e.code, e.what()); // Let subclass handle this first.
framing::AMQP_AllProxy::Execution execution(channel);
AMQMethodBody* m = f.getMethod();
SequenceNumber commandId;
@@ -98,6 +99,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
catch(const ChannelException& e){
QPID_LOG(error, "Channel exception: " << e.what());
+ channelException(e.code, e.what()); // Let subclass handle this first.
peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
index 0b158ec2b4..0d9c72ff02 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -87,8 +87,9 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
QPID_COMMON_EXTERN virtual void invoke(const framing::AMQMethodBody& m);
virtual void setState(const std::string& sessionName, bool force) = 0;
- virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0;
virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0;
+ virtual void channelException(framing::session::DetachCode, const std::string& msg) = 0;
+ virtual void executionException(framing::execution::ErrorCode, const std::string& msg) = 0;
virtual void detaching() = 0;
// Notification of events
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index b06e06d353..365b3ccbeb 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -57,7 +57,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtObject(0),
links(broker_.getLinks()),
agent(0),
- timer(broker_.getTimer())
+ timer(broker_.getTimer()),
+ errorListener(0)
{
Manageable* parent = broker.GetVhostObject();
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index b659fe6468..e67cdce681 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -66,6 +66,17 @@ class Connection : public sys::ConnectionInputHandler,
public RefCounted
{
public:
+ /**
+ * Listener that can be registered with a Connection to be informed of errors.
+ */
+ class ErrorListener
+ {
+ public:
+ virtual ~ErrorListener() {}
+ virtual void sessionError(uint16_t channel, const std::string&) = 0;
+ virtual void connectionError(const std::string&) = 0;
+ };
+
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
~Connection ();
@@ -101,6 +112,9 @@ class Connection : public sys::ConnectionInputHandler,
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
void setFederationLink(bool b);
+ /** Connection does not delete the listener. 0 resets. */
+ void setErrorListener(ErrorListener* l) { errorListener=l; }
+ ErrorListener* getErrorListener() { return errorListener; }
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
@@ -112,6 +126,7 @@ class Connection : public sys::ConnectionInputHandler,
void sendClose();
void setSecureConnection(SecureConnection* secured);
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -128,6 +143,8 @@ class Connection : public sys::ConnectionInputHandler,
management::ManagementAgent* agent;
Timer& timer;
boost::intrusive_ptr<TimerTask> heartbeatTimer;
+ ErrorListener* errorListener;
+
public:
qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
};
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 63212c7794..8b70836da0 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -64,13 +64,16 @@ void ConnectionHandler::heartbeat()
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
AMQMethodBody* method=frame.getBody()->getMethod();
+ Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
try{
if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
handler->connection.getChannel(frame.getChannel()).in(frame);
}
}catch(ConnectionException& e){
+ if (errorListener) errorListener->connectionError(e.what());
handler->proxy.close(e.code, e.what());
}catch(std::exception& e){
+ if (errorListener) errorListener->connectionError(e.what());
handler->proxy.close(541/*internal error*/, e.what());
}
}
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
index 907f1e56e1..ffe0cc437b 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
@@ -33,4 +33,6 @@ bool ExpiryPolicy::hasExpired(Message& m) {
return m.getExpiration() < sys::AbsTime::now();
}
+void ExpiryPolicy::forget(Message&) {}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
index cefe9b7552..eeb3ffda21 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
@@ -39,6 +39,7 @@ class ExpiryPolicy : public RefCounted
QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
QPID_BROKER_EXTERN virtual void willExpire(Message&);
QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
+ QPID_BROKER_EXTERN virtual void forget(Message&);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 40b5515829..1e9eb9d386 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -53,6 +53,8 @@ Message::Message(const framing::SequenceNumber& id) :
Message::~Message()
{
+ if (expiryPolicy)
+ expiryPolicy->forget(*this);
}
void Message::forcePersistent()
@@ -334,7 +336,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
- if (expiryPolicy)
+ if (expiryPolicy)
expiryPolicy->willExpire(*this);
}
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 442c3eb34b..ca1f875991 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -45,14 +45,20 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
-void SessionHandler::channelException(framing::session::DetachCode, const std::string&) {
- handleDetach();
-}
-
void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ // NOTE: must tell the error listener _before_ calling connection.close()
+ if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
connection.close(code, msg);
}
+void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
+ if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
+void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
+ if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index ffc032f64c..ca6d6bb193 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -73,8 +73,9 @@ class SessionHandler : public amqp_0_10::SessionHandler {
virtual void setState(const std::string& sessionName, bool force);
virtual qpid::SessionState* getState();
virtual framing::FrameHandler* getInHandler();
- virtual void channelException(framing::session::DetachCode code, const std::string& msg);
virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
+ virtual void channelException(framing::session::DetachCode, const std::string& msg);
+ virtual void executionException(framing::execution::ErrorCode, const std::string& msg);
virtual void detaching();
virtual void readyToSend();
diff --git a/qpid/cpp/src/tests/BasicP2PTest.h b/qpid/cpp/src/qpid/client/Completion.cpp
index b2611f0301..e3676b2bde 100644
--- a/qpid/cpp/src/tests/BasicP2PTest.h
+++ b/qpid/cpp/src/qpid/client/Completion.cpp
@@ -1,5 +1,3 @@
-#ifndef _BasicP2PTest_
-#define _BasicP2PTest_
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,26 +19,20 @@
*
*/
-#include <memory>
-#include <sstream>
-
-#include "qpid/Exception.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/MessageListener.h"
-#include "SimpleTestCaseBase.h"
-
+#include "Completion.h"
+#include "CompletionImpl.h"
+#include "HandlePrivate.h"
namespace qpid {
+namespace client {
-class BasicP2PTest : public SimpleTestCaseBase
-{
- class Receiver;
-public:
- void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options);
-};
+Completion::Completion(CompletionImpl* i) : Handle<CompletionImpl>(i) {}
+Completion::~Completion() {}
+Completion::Completion(const Completion& c) : Handle<CompletionImpl>(c.impl) {}
+Completion& Completion::operator=(const Completion& c) { Handle<CompletionImpl>::operator=(c); return *this; }
-}
+void Completion::wait() { impl->wait(); }
+bool Completion::isComplete() { return impl->isComplete(); }
+std::string Completion::getResult() { return impl->getResult(); }
-#endif
+}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Completion.h b/qpid/cpp/src/qpid/client/Completion.h
index c4979d7934..0b246b7765 100644
--- a/qpid/cpp/src/qpid/client/Completion.h
+++ b/qpid/cpp/src/qpid/client/Completion.h
@@ -22,13 +22,14 @@
#ifndef _Completion_
#define _Completion_
-#include <boost/shared_ptr.hpp>
-#include "Future.h"
-#include "SessionImpl.h"
+#include "Handle.h"
+#include <string>
namespace qpid {
namespace client {
+class CompletionImpl;
+
/**
* Asynchronous commands that do not return a result will return a
* Completion. You can use the completion to wait for that specific
@@ -38,32 +39,26 @@ namespace client {
*
*\ingroup clientapi
*/
-class Completion
+class Completion : public Handle<CompletionImpl>
{
-protected:
- Future future;
- shared_ptr<SessionImpl> session;
-
public:
///@internal
- Completion() {}
-
- ///@internal
- Completion(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {}
+ QPID_CLIENT_EXTERN Completion(CompletionImpl* =0);
+ QPID_CLIENT_EXTERN ~Completion();
+ QPID_CLIENT_EXTERN Completion(const Completion&);
+ QPID_CLIENT_EXTERN Completion& operator=(const Completion&);
/** Wait for the asynchronous command that returned this
*Completion to complete.
*
- *@exception If the command returns an error, get() throws an exception.
+ *@exception If the command returns an error.
*/
- void wait()
- {
- future.wait(*session);
- }
+ QPID_CLIENT_EXTERN void wait();
+
+ QPID_CLIENT_EXTERN bool isComplete();
- bool isComplete() {
- return future.isComplete(*session);
- }
+ protected:
+ QPID_CLIENT_EXTERN std::string getResult();
};
}}
diff --git a/qpid/cpp/src/tests/BasicPubSubTest.h b/qpid/cpp/src/qpid/client/CompletionImpl.h
index 242d2847d7..119abc093a 100644
--- a/qpid/cpp/src/tests/BasicPubSubTest.h
+++ b/qpid/cpp/src/qpid/client/CompletionImpl.h
@@ -1,5 +1,6 @@
-#ifndef _BasicPubSubTest_
-#define _BasicPubSubTest_
+#ifndef QPID_CLIENT_COMPLETIONIMPL_H
+#define QPID_CLIENT_COMPLETIONIMPL_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,31 +22,29 @@
*
*/
-#include <memory>
-#include <sstream>
-
-#include "qpid/Exception.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/MessageListener.h"
-#include "SimpleTestCaseBase.h"
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <boost/format.hpp>
-
+#include "qpid/RefCounted.h"
+#include "Future.h"
namespace qpid {
+namespace client {
-using namespace qpid::client;
-
-class BasicPubSubTest : public SimpleTestCaseBase
+///@internal
+class CompletionImpl : public RefCounted
{
- class Receiver;
- class MultiReceiver;
public:
- void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options);
+ CompletionImpl() {}
+ CompletionImpl(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {}
+
+ bool isComplete() { return future.isComplete(*session); }
+ void wait() { future.wait(*session); }
+ std::string getResult() { return future.getResult(*session); }
+
+protected:
+ Future future;
+ shared_ptr<SessionImpl> session;
};
-}
+}} // namespace qpid::client
+
-#endif
+#endif /*!QPID_CLIENT_COMPLETIONIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index cc62d724cb..e8415403ca 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "Connection.h"
+#include "ConnectionImpl.h"
#include "ConnectionSettings.h"
#include "Message.h"
#include "SessionImpl.h"
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index 846ac33790..d898ea70d9 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -25,6 +25,7 @@
#include <string>
#include "qpid/client/Session.h"
#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/ConnectionSettings.h"
namespace qpid {
@@ -32,7 +33,6 @@ struct Url;
namespace client {
-struct ConnectionSettings;
class ConnectionImpl;
/**
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index 745bdb63b5..b1e83025ab 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -111,9 +111,11 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
Mutex::ScopedLock l(lock);
s = sessions[frame.getChannel()].lock();
}
- if (!s)
- throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel()));
- s->in(frame);
+ if (!s) {
+ QPID_LOG(info, "Dropping frame received on invalid channel: " << frame);
+ } else {
+ s->in(frame);
+ }
}
bool ConnectionImpl::isOpen() const
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index 8d8574520a..5156031748 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -26,6 +26,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/BlockingQueue.h"
#include "Message.h"
+#include "MessageImpl.h"
#include <boost/state_saver.hpp>
@@ -49,6 +50,9 @@ Dispatcher::Dispatcher(const Session& s, const std::string& q)
session.getExecution().getDemux().get(q);
}
+Dispatcher::~Dispatcher() {}
+
+
void Dispatcher::start()
{
worker = Thread(this);
@@ -71,7 +75,7 @@ void Dispatcher::run()
Mutex::ScopedUnlock u(lock);
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message msg(*content);
+ Message msg(new MessageImpl(*content));
boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination());
if (!listener) {
QPID_LOG(error, "No listener found for destination " << msg.getDestination());
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.h b/qpid/cpp/src/qpid/client/Dispatcher.h
index e84f8f303d..4dbb75dcf2 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.h
+++ b/qpid/cpp/src/qpid/client/Dispatcher.h
@@ -30,7 +30,6 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "MessageListener.h"
-#include "SubscriptionImpl.h"
namespace qpid {
namespace client {
@@ -61,6 +60,7 @@ class Dispatcher : public sys::Runnable
public:
Dispatcher(const Session& session, const std::string& queue = "");
+ ~Dispatcher();
void start();
void wait();
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp
index 16370f8912..ed9354d528 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp
@@ -20,6 +20,9 @@
*/
#include "FailoverListener.h"
#include "SessionBase_0_10Access.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/SubscriptionImpl.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.h b/qpid/cpp/src/qpid/client/FailoverListener.h
index fe73a26611..7afee736ac 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.h
+++ b/qpid/cpp/src/qpid/client/FailoverListener.h
@@ -33,6 +33,7 @@ namespace qpid {
namespace client {
class SubscriptionManager;
+class ConnectionImpl;
/**
* @internal Listen for failover updates from the amq.failover exchange.
diff --git a/qpid/cpp/src/qpid/client/Future.cpp b/qpid/cpp/src/qpid/client/Future.cpp
index 6a0c78ae4b..fda40219ba 100644
--- a/qpid/cpp/src/qpid/client/Future.cpp
+++ b/qpid/cpp/src/qpid/client/Future.cpp
@@ -20,6 +20,7 @@
*/
#include "Future.h"
+#include "SessionImpl.h"
namespace qpid {
namespace client {
diff --git a/qpid/cpp/src/qpid/client/Future.h b/qpid/cpp/src/qpid/client/Future.h
index ea01522fe8..28c9a2bbbd 100644
--- a/qpid/cpp/src/qpid/client/Future.h
+++ b/qpid/cpp/src/qpid/client/Future.h
@@ -26,17 +26,15 @@
#include <boost/shared_ptr.hpp>
#include "qpid/Exception.h"
#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/StructHelper.h"
#include "FutureCompletion.h"
#include "FutureResult.h"
-#include "SessionImpl.h"
#include "ClientImportExport.h"
namespace qpid {
namespace client {
/**@internal */
-class Future : private framing::StructHelper
+class Future
{
framing::SequenceNumber command;
boost::shared_ptr<FutureResult> result;
@@ -46,13 +44,9 @@ public:
Future() : complete(false) {}
Future(const framing::SequenceNumber& id) : command(id), complete(false) {}
- template <class T> void decodeResult(T& value, SessionImpl& session)
- {
- if (result) {
- decode(value, result->getResult(session));
- } else {
- throw Exception("Result not expected");
- }
+ std::string getResult(SessionImpl& session) {
+ if (result) return result->getResult(session);
+ else throw Exception("Result not expected");
}
QPID_CLIENT_EXTERN void wait(SessionImpl& session);
diff --git a/qpid/cpp/src/qpid/client/Handle.h b/qpid/cpp/src/qpid/client/Handle.h
index d8b822d0f9..12fb4cf3c1 100644
--- a/qpid/cpp/src/qpid/client/Handle.h
+++ b/qpid/cpp/src/qpid/client/Handle.h
@@ -30,9 +30,11 @@ namespace client {
template <class T> class HandlePrivate;
/**
- * A handle is like a pointer: it points to some underlying object.
+ * A handle is like a pointer: it points to some implementation object.
+ * Copying the handle does not copy the object.
+ *
* Handles can be null, like a 0 pointer. Use isValid(), isNull() or the
- * implicit conversion to bool to test for a null handle.
+ * conversion to bool to test for a null handle.
*/
template <class T> class Handle {
public:
@@ -46,8 +48,11 @@ template <class T> class Handle {
/**@return true if handle is null. It is an error to call any function on a null handle. */
QPID_CLIENT_EXTERN bool isNull() const { return !impl; }
+ /** Conversion to bool supports idiom if (handle) { handle->... } */
QPID_CLIENT_EXTERN operator bool() const { return impl; }
- QPID_CLIENT_EXTERN bool operator !() const { return impl; }
+
+ /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */
+ QPID_CLIENT_EXTERN bool operator !() const { return !impl; }
QPID_CLIENT_EXTERN void swap(Handle<T>&);
diff --git a/qpid/cpp/src/qpid/client/HandlePrivate.h b/qpid/cpp/src/qpid/client/HandlePrivate.h
index 488ce48075..46e4bff808 100644
--- a/qpid/cpp/src/qpid/client/HandlePrivate.h
+++ b/qpid/cpp/src/qpid/client/HandlePrivate.h
@@ -21,14 +21,16 @@
* under the License.
*
*/
+#include "Handle.h"
+#include "qpid/RefCounted.h"
#include <algorithm>
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace client {
/** @file
- * Private implementation of handle, include in .cpp file of handle
- * subclasses _after_ including the declaration of class T.
+ * Implementation of handle, include in .cpp file of handle subclasses.
* T can be any class that can be used with boost::intrusive_ptr.
*/
@@ -52,9 +54,13 @@ void Handle<T>::swap(Handle<T>& h) { std::swap(impl, h.impl); }
template <class T>
class HandlePrivate {
public:
- static boost::intrusive_ptr<T> get(Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+ static boost::intrusive_ptr<T> get(const Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+ static void set(Handle<T>& h, const boost::intrusive_ptr<T>& p) { Handle<T>(p.get()).swap(h); }
};
+template<class T> boost::intrusive_ptr<T> handleGetPtr(Handle<T>& h) { return HandlePrivate<T>::get(h); }
+template<class T> boost::intrusive_ptr<const T> handleGetPtr(const Handle<T>& h) { return HandlePrivate<T>::get(h); }
+template<class T> void handleSetPtr(Handle<T>& h, const boost::intrusive_ptr<T>& p) { HandlePrivate<T>::set(h, p); }
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/LocalQueue.cpp b/qpid/cpp/src/qpid/client/LocalQueue.cpp
index e449c9f795..02fecf804f 100644
--- a/qpid/cpp/src/qpid/client/LocalQueue.cpp
+++ b/qpid/cpp/src/qpid/client/LocalQueue.cpp
@@ -19,11 +19,14 @@
*
*/
#include "LocalQueue.h"
+#include "MessageImpl.h"
#include "qpid/Exception.h"
#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "HandlePrivate.h"
#include "SubscriptionImpl.h"
+#include "CompletionImpl.h"
namespace qpid {
namespace client {
@@ -49,7 +52,7 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) {
bool ok = queue->pop(content, timeout);
if (!ok) return false;
if (content->isA<MessageTransferBody>()) {
- result = Message(*content);
+ result = Message(new MessageImpl(*content));
boost::intrusive_ptr<SubscriptionImpl> si = HandlePrivate<SubscriptionImpl>::get(subscription);
assert(si);
if (si) si->received(result);
diff --git a/qpid/cpp/src/qpid/client/Message.cpp b/qpid/cpp/src/qpid/client/Message.cpp
index 13caaecefd..962ce26305 100644
--- a/qpid/cpp/src/qpid/client/Message.cpp
+++ b/qpid/cpp/src/qpid/client/Message.cpp
@@ -20,52 +20,37 @@
*/
#include "Message.h"
+#include "PrivateImplPrivate.h"
+#include "MessageImpl.h"
namespace qpid {
namespace client {
-Message::Message(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {}
+template class PrivateImpl<MessageImpl>;
-std::string Message::getDestination() const
-{
- return method.getDestination();
-}
+Message::Message(const std::string& data, const std::string& routingKey) : PrivateImpl<MessageImpl>(new MessageImpl(data, routingKey)) {}
+Message::Message(MessageImpl* i) : PrivateImpl<MessageImpl>(i) {}
+Message::~Message() {}
-bool Message::isRedelivered() const
-{
- return hasDeliveryProperties() && getDeliveryProperties().getRedelivered();
-}
+std::string Message::getDestination() const { return impl->getDestination(); }
+bool Message::isRedelivered() const { return impl->isRedelivered(); }
+void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); }
+framing::FieldTable& Message::getHeaders() { return impl->getHeaders(); }
+const framing::FieldTable& Message::getHeaders() const { return impl->getHeaders(); }
+const framing::SequenceNumber& Message::getId() const { return impl->getId(); }
-void Message::setRedelivered(bool redelivered)
-{
- getDeliveryProperties().setRedelivered(redelivered);
-}
+void Message::setData(const std::string& s) { impl->setData(s); }
+const std::string& Message::getData() const { return impl->getData(); }
+std::string& Message::getData() { return impl->getData(); }
-framing::FieldTable& Message::getHeaders()
-{
- return getMessageProperties().getApplicationHeaders();
-}
+void Message::appendData(const std::string& s) { impl->appendData(s); }
-const framing::FieldTable& Message::getHeaders() const
-{
- return getMessageProperties().getApplicationHeaders();
-}
+bool Message::hasMessageProperties() const { return impl->hasMessageProperties(); }
+framing::MessageProperties& Message::getMessageProperties() { return impl->getMessageProperties(); }
+const framing::MessageProperties& Message::getMessageProperties() const { return impl->getMessageProperties(); }
-const framing::MessageTransferBody& Message::getMethod() const
-{
- return method;
-}
-
-const framing::SequenceNumber& Message::getId() const
-{
- return id;
-}
-
-/**@internal for incoming messages */
-Message::Message(const framing::FrameSet& frameset) :
- method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
-{
- populate(frameset);
-}
+bool Message::hasDeliveryProperties() const { return impl->hasDeliveryProperties(); }
+framing::DeliveryProperties& Message::getDeliveryProperties() { return impl->getDeliveryProperties(); }
+const framing::DeliveryProperties& Message::getDeliveryProperties() const { return impl->getDeliveryProperties(); }
}}
diff --git a/qpid/cpp/src/qpid/client/Message.h b/qpid/cpp/src/qpid/client/Message.h
index 235e20f97d..97238db647 100644
--- a/qpid/cpp/src/qpid/client/Message.h
+++ b/qpid/cpp/src/qpid/client/Message.h
@@ -1,5 +1,5 @@
-#ifndef _client_Message_h
-#define _client_Message_h
+#ifndef QPID_CLIENT_MESSAGE_H
+#define QPID_CLIENT_MESSAGE_H
/*
*
@@ -21,15 +21,24 @@
* under the License.
*
*/
-#include <string>
-#include "qpid/client/Session.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/TransferContent.h"
+
+#include "qpid/client/PrivateImpl.h"
#include "qpid/client/ClientImportExport.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include <string>
namespace qpid {
+
+namespace framing {
+class FieldTable;
+class SequenceNumber; // FIXME aconway 2009-04-17: remove with getID?
+}
+
namespace client {
+class MessageImpl;
+
/**
* A message sent to or received from the broker.
*
@@ -104,8 +113,7 @@ namespace client {
*
*
*/
-
-class Message : public framing::TransferContent
+class Message : public PrivateImpl<MessageImpl>
{
public:
/** Create a Message.
@@ -115,6 +123,23 @@ public:
QPID_CLIENT_EXTERN Message(const std::string& data=std::string(),
const std::string& routingKey=std::string());
+ QPID_CLIENT_EXTERN ~Message();
+
+ QPID_CLIENT_EXTERN void setData(const std::string&);
+ QPID_CLIENT_EXTERN const std::string& getData() const;
+ QPID_CLIENT_EXTERN std::string& getData();
+
+ QPID_CLIENT_EXTERN void appendData(const std::string&);
+
+ QPID_CLIENT_EXTERN bool hasMessageProperties() const;
+ QPID_CLIENT_EXTERN framing::MessageProperties& getMessageProperties();
+ QPID_CLIENT_EXTERN const framing::MessageProperties& getMessageProperties() const;
+
+ QPID_CLIENT_EXTERN bool hasDeliveryProperties() const;
+ QPID_CLIENT_EXTERN framing::DeliveryProperties& getDeliveryProperties();
+ QPID_CLIENT_EXTERN const framing::DeliveryProperties& getDeliveryProperties() const;
+
+
/** The destination of messages sent to the broker is the exchange
* name. The destination of messages received from the broker is
* the delivery tag identifyig the local subscription (often this
@@ -133,20 +158,14 @@ public:
/** Get a non-modifyable reference to the message headers. */
QPID_CLIENT_EXTERN const framing::FieldTable& getHeaders() const;
- ///@internal
- QPID_CLIENT_EXTERN const framing::MessageTransferBody& getMethod() const;
+ // FIXME aconway 2009-04-17: does this need to be in public API?
///@internal
QPID_CLIENT_EXTERN const framing::SequenceNumber& getId() const;
- /**@internal for incoming messages */
- QPID_CLIENT_EXTERN Message(const framing::FrameSet& frameset);
-
-private:
- //method and id are only set for received messages:
- framing::MessageTransferBody method;
- framing::SequenceNumber id;
+ ///@internal
+ Message(MessageImpl*);
};
}}
-#endif /*!_client_Message_h*/
+#endif /*!QPID_CLIENT_MESSAGE_H*/
diff --git a/qpid/cpp/src/qpid/client/MessageImpl.cpp b/qpid/cpp/src/qpid/client/MessageImpl.cpp
new file mode 100644
index 0000000000..3d06fd1d8d
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/MessageImpl.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageImpl.h"
+
+namespace qpid {
+namespace client {
+
+MessageImpl::MessageImpl(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {}
+
+std::string MessageImpl::getDestination() const
+{
+ return method.getDestination();
+}
+
+bool MessageImpl::isRedelivered() const
+{
+ return hasDeliveryProperties() && getDeliveryProperties().getRedelivered();
+}
+
+void MessageImpl::setRedelivered(bool redelivered)
+{
+ getDeliveryProperties().setRedelivered(redelivered);
+}
+
+framing::FieldTable& MessageImpl::getHeaders()
+{
+ return getMessageProperties().getApplicationHeaders();
+}
+
+const framing::FieldTable& MessageImpl::getHeaders() const
+{
+ return getMessageProperties().getApplicationHeaders();
+}
+
+const framing::MessageTransferBody& MessageImpl::getMethod() const
+{
+ return method;
+}
+
+const framing::SequenceNumber& MessageImpl::getId() const
+{
+ return id;
+}
+
+/**@internal for incoming messages */
+MessageImpl::MessageImpl(const framing::FrameSet& frameset) :
+ method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
+{
+ populate(frameset);
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/client/MessageImpl.h b/qpid/cpp/src/qpid/client/MessageImpl.h
new file mode 100644
index 0000000000..c06d9b5afc
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/MessageImpl.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLIENT_MESSAGEIMPL_H
+#define QPID_CLIENT_MESSAGEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/Session.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/TransferContent.h"
+
+namespace qpid {
+namespace client {
+
+class MessageImpl : public framing::TransferContent
+{
+public:
+ /** Create a Message.
+ *@param data Data for the message body.
+ *@param routingKey Passed to the exchange that routes the message.
+ */
+ MessageImpl(const std::string& data=std::string(),
+ const std::string& routingKey=std::string());
+
+ /** The destination of messages sent to the broker is the exchange
+ * name. The destination of messages received from the broker is
+ * the delivery tag identifyig the local subscription (often this
+ * is the name of the subscribed queue.)
+ */
+ std::string getDestination() const;
+
+ /** Check the redelivered flag. */
+ bool isRedelivered() const;
+ /** Set the redelivered flag. */
+ void setRedelivered(bool redelivered);
+
+ /** Get a modifyable reference to the message headers. */
+ framing::FieldTable& getHeaders();
+
+ /** Get a non-modifyable reference to the message headers. */
+ const framing::FieldTable& getHeaders() const;
+
+ ///@internal
+ const framing::MessageTransferBody& getMethod() const;
+ ///@internal
+ const framing::SequenceNumber& getId() const;
+
+ /**@internal for incoming messages */
+ MessageImpl(const framing::FrameSet& frameset);
+
+private:
+ //method and id are only set for received messages:
+ framing::MessageTransferBody method;
+ framing::SequenceNumber id;
+};
+
+}}
+
+#endif /*!QPID_CLIENT_MESSAGEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/PrivateImpl.h b/qpid/cpp/src/qpid/client/PrivateImpl.h
new file mode 100644
index 0000000000..6e5ea35ce0
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/PrivateImpl.h
@@ -0,0 +1,54 @@
+#ifndef QPID_CLIENT_PRIVATEIMPL_H
+#define QPID_CLIENT_PRIVATEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+
+namespace qpid {
+namespace client {
+
+template <class T> class PrivateImplPrivate;
+
+/**
+ * Base classes for objects with a private implementation.
+ *
+ * PrivateImpl objects have value semantics: copying the object also
+ * makes a copy of the implementation.
+ */
+template <class T> class PrivateImpl {
+ public:
+ QPID_CLIENT_EXTERN ~PrivateImpl();
+ QPID_CLIENT_EXTERN PrivateImpl(const PrivateImpl&);
+ QPID_CLIENT_EXTERN PrivateImpl& operator=(const PrivateImpl&);
+ QPID_CLIENT_EXTERN void swap(PrivateImpl<T>&);
+
+ protected:
+ QPID_CLIENT_EXTERN PrivateImpl(T*);
+ T* impl;
+
+ friend class PrivateImplPrivate<T>;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_PRIVATEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/PrivateImplPrivate.h b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h
new file mode 100644
index 0000000000..021456e085
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h
@@ -0,0 +1,66 @@
+#ifndef QPID_CLIENT_PRIVATEIMPLPRIVATE_H
+#define QPID_CLIENT_PRIVATEIMPLPRIVATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <algorithm>
+
+namespace qpid {
+namespace client {
+
+/** @file
+ * Implementation of PrivateImpl functions, to include in .cpp file of handle subclasses.
+ * T can be any class with value semantics.
+ */
+
+template <class T>
+PrivateImpl<T>::PrivateImpl(T* p) : impl(p) { assert(impl); }
+
+template <class T>
+PrivateImpl<T>::~PrivateImpl() { delete impl; }
+
+template <class T>
+PrivateImpl<T>::PrivateImpl(const PrivateImpl& h) : impl(new T(*h.impl)) {}
+
+template <class T>
+PrivateImpl<T>& PrivateImpl<T>::operator=(const PrivateImpl<T>& h) { PrivateImpl<T>(h).swap(*this); return *this; }
+
+template <class T>
+void PrivateImpl<T>::swap(PrivateImpl<T>& h) { std::swap(impl, h.impl); }
+
+
+/** Access to private impl of a PrivateImpl */
+template <class T>
+class PrivateImplPrivate {
+ public:
+ static T* get(const PrivateImpl<T>& h) { return h.impl; }
+ static void set(PrivateImpl<T>& h, const T& p) { PrivateImpl<T>(p).swap(h); }
+};
+
+template<class T> T* privateImplGetPtr(PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); }
+template<class T> T* privateImplGetPtr(const PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); }
+template<class T> void privateImplSetPtr(PrivateImpl<T>& h, const T*& p) { PrivateImplPrivate<T>::set(h, p); }
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_PRIVATEIMPLPRIVATE_H*/
+
diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
index e81b78ecd3..8a33c7393f 100644
--- a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
+++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -20,6 +20,8 @@
*/
#include "SessionBase_0_10.h"
#include "Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/Future.h"
#include "qpid/framing/all_method_bodies.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.h b/qpid/cpp/src/qpid/client/SessionBase_0_10.h
index 3ae21936f6..d375b3ec2e 100644
--- a/qpid/cpp/src/qpid/client/SessionBase_0_10.h
+++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.h
@@ -23,14 +23,11 @@
*/
#include "qpid/SessionId.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/framing/amqp_structs.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Message.h"
#include "qpid/client/Completion.h"
-#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Execution.h"
-#include "qpid/client/SessionImpl.h"
#include "qpid/client/TypedResult.h"
#include "qpid/shared_ptr.h"
#include "qpid/client/ClientImportExport.h"
@@ -44,7 +41,6 @@ class Connection;
using std::string;
using framing::Content;
using framing::FieldTable;
-using framing::MethodContent;
using framing::SequenceNumber;
using framing::SequenceSet;
using framing::SequenceNumberSet;
@@ -62,8 +58,6 @@ enum CreditUnit { MESSAGE_CREDIT=0, BYTE_CREDIT=1, UNLIMITED_CREDIT=0xFFFFFFFF }
*/
class SessionBase_0_10 {
public:
-
- typedef framing::TransferContent DefaultContent;
///@internal
QPID_CLIENT_EXTERN SessionBase_0_10();
diff --git a/qpid/cpp/src/qpid/client/Subscription.cpp b/qpid/cpp/src/qpid/client/Subscription.cpp
index 1f1b5ac6c6..37f5557d51 100644
--- a/qpid/cpp/src/qpid/client/Subscription.cpp
+++ b/qpid/cpp/src/qpid/client/Subscription.cpp
@@ -21,6 +21,7 @@
#include "Subscription.h"
#include "SubscriptionImpl.h"
+#include "CompletionImpl.h"
#include "HandlePrivate.h"
#include "qpid/framing/enum.h"
diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
index e09a4c142e..82c920cf47 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -20,8 +20,12 @@
*/
#include "SubscriptionImpl.h"
+#include "MessageImpl.h"
+#include "CompletionImpl.h"
#include "SubscriptionManager.h"
#include "SubscriptionSettings.h"
+#include "HandlePrivate.h"
+#include "PrivateImplPrivate.h"
namespace qpid {
namespace client {
@@ -114,9 +118,9 @@ void SubscriptionImpl::cancel() { manager.cancel(name); }
void SubscriptionImpl::received(Message& m) {
Mutex::ScopedLock l(lock);
- if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
+ if (privateImplGetPtr(m)->getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
unacquired.add(m.getId());
- else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
+ else if (privateImplGetPtr(m)->getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
unaccepted.add(m.getId());
if (listener) {
diff --git a/qpid/cpp/src/qpid/client/TypedResult.h b/qpid/cpp/src/qpid/client/TypedResult.h
index 5306997d74..2e54f9fdfc 100644
--- a/qpid/cpp/src/qpid/client/TypedResult.h
+++ b/qpid/cpp/src/qpid/client/TypedResult.h
@@ -23,6 +23,7 @@
#define _TypedResult_
#include "Completion.h"
+#include "qpid/framing/StructHelper.h"
namespace qpid {
namespace client {
@@ -39,7 +40,7 @@ template <class T> class TypedResult : public Completion
public:
///@internal
- TypedResult(Future f, shared_ptr<SessionImpl> s) : Completion(f, s), decoded(false) {}
+ TypedResult(CompletionImpl* c) : Completion(c), decoded(false) {}
/**
* Wait for the asynchronous command that returned this TypedResult to complete
@@ -49,13 +50,12 @@ public:
*@exception If the command returns an error, get() throws an exception.
*
*/
- T& get()
- {
+ T& get() {
if (!decoded) {
- future.decodeResult(result, *session);
+ framing::StructHelper helper;
+ helper.decode(result, getResult());
decoded = true;
}
-
return result;
}
};
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index f8e412f1e6..a17f54078c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,6 +36,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -46,7 +47,6 @@
#include "qpid/management/ManagementBroker.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
-#include "qpid/sys/LatencyMetric.h"
#include "qpid/sys/Thread.h"
#include <boost/bind.hpp>
@@ -63,6 +63,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
using namespace qpid::cluster;
+using namespace qpid::framing::cluster;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -77,9 +78,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
- void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
+ void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+ void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -112,7 +114,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
discarding(true),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ error(*this)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -195,14 +198,19 @@ void Cluster::leave() {
leave(l);
}
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
+ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+ } do {} while(0)
+
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- try { broker.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
- }
+ // Finalize connections now now to avoid problems later in destructor.
+ LEAVE_TRY(localConnections.clear());
+ LEAVE_TRY(connections.clear());
+ LEAVE_TRY(broker.shutdown());
}
}
@@ -218,8 +226,6 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- if (from == self) // Record self-deliveries for flow control.
- mcast.selfDeliver(e);
deliverEvent(e);
}
@@ -254,10 +260,22 @@ void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DROP: " << e);
}
+void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+ Mutex::ScopedLock l(lock);
+ error.error(connection, type, map.getFrameSeq(), map.getMembers());
+}
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
Mutex::ScopedLock l(lock);
+ // Process each frame through the error checker.
+ error.delivered(e);
+ while (error.canProcess()) // There is a frame ready to process.
+ processFrame(error.getNext(), l);
+}
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -265,7 +283,8 @@ void Cluster::deliveredFrame(const EventFrame& e) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ map.incrementFrameSeq();
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
@@ -316,11 +335,11 @@ ostream& operator<<(ostream& o, const AddrList& a) {
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " (joined) "; break;
- case CPG_REASON_LEAVE: reasonString = " (left) "; break;
- case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break;
- case CPG_REASON_NODEUP: reasonString = " (node-up) "; break;
- case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break;
+ case CPG_REASON_JOIN: reasonString = "(joined) "; break;
+ case CPG_REASON_LEAVE: reasonString = "(left) "; break;
+ case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break;
+ case CPG_REASON_NODEUP: reasonString = "(node-up) "; break;
+ case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
@@ -342,8 +361,8 @@ void Cluster::configChange (
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
- << AddrList(left, nLeft, "( ", ")"));
+ QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
+ << AddrList(left, nLeft, "left: "));
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
@@ -357,8 +376,8 @@ void Cluster::setReady(Lock&) {
broker.getQueueEvents().enable();
}
-void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
- bool memberChange = map.configChange(addresses);
+void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
+ bool memberChange = map.configChange(current);
if (state == LEFT) return;
if (!map.isAlive(self)) { // Final config change.
@@ -589,19 +608,24 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_memberIDs(idstr);
}
- // Erase connections belonging to members that have left the cluster.
+ // Close connections belonging to members that have left the cluster.
ConnectionMap::iterator i = connections.begin();
while (i != connections.end()) {
ConnectionMap::iterator j = i++;
MemberId m = j->second->getId().getMember();
if (m != self && !map.isMember(m))
- connections.erase(j);
+ j->second->deliverClose();
}
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.self << "(" << STATE[cluster.state] << ")";
+ static const char* STATE[] = {
+ "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ };
+ assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+ o << cluster.self << "(" << STATE[cluster.state];
+ if (cluster.error.isUnresolved()) o << "/error";
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -635,4 +659,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
+void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+ // If we receive an errorCheck here, it's because we have processed past the point
+ // of the error so respond with ERROR_TYPE_NONE
+ assert(map.getFrameSeq() >= frameSeq);
+ if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index b716e2d781..8a94fc79dd 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -23,6 +23,7 @@
#include "ClusterSettings.h"
#include "Cpg.h"
#include "Decoder.h"
+#include "ErrorCheck.h"
#include "Event.h"
#include "EventFrame.h"
#include "ExpiryPolicy.h"
@@ -105,6 +106,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void deliverFrame(const EventFrame&);
+ // Called in deliverFrame thread to indicate an error from the broker.
+ void flagError(Connection&, ErrorCheck::ErrorType);
+ void connectionError();
+
// Called only during update by Connection::shadowReady
Decoder& getDecoder() { return decoder; }
@@ -132,13 +137,15 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// == Called in deliverFrameQueue thread
void deliveredFrame(const EventFrame&);
+ void processFrame(const EventFrame&, Lock&);
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
- void configChange(const MemberId&, const std::string& addresses, Lock& l);
+ void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
+ void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&);
void shutdown(const MemberId&, Lock&);
// Helper functions
@@ -216,11 +223,13 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Decoder decoder;
bool discarding;
+
// Remaining members are protected by lock.
- // FIXME aconway 2009-03-06: Most of these members are also only used in
+
+ // TODO aconway 2009-03-06: Most of these members are also only used in
// deliverFrameQueue thread or during stall. Review and separate members
// that require a lock, drop lock when not needed.
- //
+
mutable sys::Monitor lock;
@@ -243,7 +252,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
-
+ ErrorCheck error;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index 9e7232180d..0395ff6382 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -33,6 +33,13 @@ using namespace framing;
namespace cluster {
+ClusterMap::Set ClusterMap::decode(const std::string& s) {
+ Set set;
+ for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8)
+ set.insert(MemberId(std::string(i, i+8)));
+ return set;
+}
+
namespace {
void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) {
@@ -54,9 +61,9 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
}
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : frameSeq(0) {}
-ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
+ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) {
alive.insert(id);
if (isMember)
members[id] = url;
@@ -64,7 +71,9 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
joiners[id] = url;
}
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) {
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+ : frameSeq(frameSeq_)
+{
std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
}
@@ -78,22 +87,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const
}
b.getMembers().clear();
std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
-}
-
-bool ClusterMap::configChange(
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
-{
- cpg_address* a;
- bool memberChange=false;
- for (a = left; a != left+nLeft; ++a) {
- memberChange = memberChange || members.erase(*a);
- joiners.erase(*a);
- }
- alive.clear();
- std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
- return memberChange;
+ b.setFrameSeq(frameSeq);
}
Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
@@ -123,8 +117,13 @@ std::vector<Url> ClusterMap::memberUrls() const {
return urls;
}
-ClusterMap::Set ClusterMap::getAlive() const {
- return alive;
+ClusterMap::Set ClusterMap::getAlive() const { return alive; }
+
+ClusterMap::Set ClusterMap::getMembers() const {
+ Set s;
+ std::transform(members.begin(), members.end(), std::inserter(s, s.begin()),
+ boost::bind(&Map::value_type::first, _1));
+ return s;
}
std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
@@ -158,7 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) {
bool ClusterMap::configChange(const std::string& addresses) {
bool memberChange = false;
- Set update;
+ Set update = decode(addresses);
for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8)
update.insert(MemberId(std::string(i, i+8)));
Set removed;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index 4548441442..3359c7c1f3 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -38,26 +38,26 @@
namespace qpid {
namespace cluster {
+typedef std::set<MemberId> MemberSet;
+
/**
- * Map of established cluster members and joiners waiting for an update.
+ * Map of established cluster members and joiners waiting for an update,
+ * along with other cluster state that must be updated.
*/
class ClusterMap {
public:
typedef std::map<MemberId, Url> Map;
typedef std::set<MemberId> Set;
+ static Set decode(const std::string&);
+
ClusterMap();
ClusterMap(const MemberId& id, const Url& url, bool isReady);
- ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
+ ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
/** Update from config change.
*@return true if member set changed.
*/
- bool configChange(
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined);
-
bool configChange(const std::string& addresses);
bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
@@ -78,6 +78,7 @@ class ClusterMap {
std::vector<std::string> memberIds() const;
std::vector<Url> memberUrls() const;
Set getAlive() const;
+ Set getMembers() const;
bool updateRequest(const MemberId& id, const std::string& url);
/** Return non-empty Url if accepted */
@@ -90,11 +91,16 @@ class ClusterMap {
* Utility method to return intersection of two member sets
*/
static Set intersection(const Set& a, const Set& b);
+
+ uint64_t getFrameSeq() { return frameSeq; }
+ uint64_t incrementFrameSeq() { return ++frameSeq; }
+
private:
Url getUrl(const Map& map, const MemberId& id);
Map joiners, members;
Set alive;
+ uint64_t frameSeq;
friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index aa7d082720..97cafbabaa 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -39,8 +39,6 @@
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyMetric.h"
-#include "qpid/sys/AtomicValue.h"
#include <boost/current_function.hpp>
@@ -56,8 +54,16 @@ namespace qpid {
namespace cluster {
using namespace framing;
+using namespace framing::cluster;
+
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
+Connection::NullFrameHandler Connection::nullFrameHandler;
+
+struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+};
-NoOpConnectionOutputHandler Connection::discardHandler;
namespace {
sys::AtomicValue<uint64_t> idCounter;
@@ -89,6 +95,8 @@ void Connection::init() {
connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
connection.setClientThrottling(false); // Disable client throttling, done by active node.
}
+ if (!isCatchUp())
+ connection.setErrorListener(this);
}
void Connection::giveReadCredit(int credit) {
@@ -97,6 +105,7 @@ void Connection::giveReadCredit(int credit) {
}
Connection::~Connection() {
+ connection.setErrorListener(0);
QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
@@ -126,7 +135,7 @@ void Connection::received(framing::AMQFrame& f) {
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection.getOutput().send(ok);
- output.closeOutput(discardHandler);
+ output.closeOutput();
catchUp = false;
}
else
@@ -156,8 +165,8 @@ void Connection::deliveredFrame(const EventFrame& f) {
{
if (f.type == DATA) // incoming data frames to broker::Connection
connection.received(const_cast<AMQFrame&>(f.frame));
- else { // frame control, send frame via SessionState
- broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ else { // frame control, send frame via SessionState
+ broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
}
@@ -180,7 +189,7 @@ void Connection::closed() {
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
- output.closeOutput(discardHandler);
+ output.closeOutput();
cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
}
}
@@ -275,13 +284,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
connection.setUserId(username);
- // OK to use decoder here because we are stalled for update.
+ // OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
+ connection.setErrorListener(this);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members));
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
self.second = 0; // Mark this as completed update connection.
}
@@ -305,7 +315,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
}
broker::QueuedMessage Connection::getUpdateMessage() {
- broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+ shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
+ assert(!updateq->isDurable());
+ broker::QueuedMessage m = updateq->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
@@ -342,15 +354,15 @@ void Connection::deliveryRecord(const string& qname,
// If the message was unacked, the newbie broker must place
// it in its messageStore.
- if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled )
+ if ( m.payload && m.payload->isPersistent() && acquired && !ended)
queue->enqueue ( 0, m.payload );
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
- shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
- if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
- q->setPosition(position);
-}
+ shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+ if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+ q->setPosition(position);
+ }
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
@@ -407,7 +419,14 @@ void Connection::queue(const std::string& encoded) {
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
-qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+void Connection::sessionError(uint16_t , const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_SESSION);
+
+}
+
+void Connection::connectionError(const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 6434f763a8..414e5c935f 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -25,7 +25,6 @@
#include "types.h"
#include "WriteEstimate.h"
#include "OutputInterceptor.h"
-#include "NoOpConnectionOutputHandler.h"
#include "EventFrame.h"
#include "McastFrameHandler.h"
@@ -58,7 +57,8 @@ class Event;
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
- public framing::AMQP_AllOperations::ClusterConnectionHandler
+ public framing::AMQP_AllOperations::ClusterConnectionHandler,
+ private broker::Connection::ErrorListener
{
public:
@@ -120,7 +120,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
@@ -151,14 +151,22 @@ class Connection :
void giveReadCredit(int credit);
+ void deliverClose();
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
};
+
+ static NullFrameHandler nullFrameHandler;
+
+ // Error listener functions
+ void connectionError(const std::string&);
+ void sessionError(uint16_t channel, const std::string&);
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
- void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
@@ -167,8 +175,6 @@ class Connection :
broker::SemanticState& semanticState();
broker::QueuedMessage getUpdateMessage();
- static NoOpConnectionOutputHandler discardHandler;
-
Cluster& cluster;
ConnectionId self;
bool catchUp;
@@ -181,7 +187,6 @@ class Connection :
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
- NullFrameHandler nullFrameHandler;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 915a578989..f746eacea4 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -75,11 +75,14 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow
::memset(&callbacks, sizeof(callbacks), 0);
callbacks.cpg_deliver_fn = &globalDeliver;
callbacks.cpg_confchg_fn = &globalConfigChange;
+
+ QPID_LOG(info, "Initializing CPG");
cpg_error_t err = cpg_initialize(&handle, &callbacks);
- if (err == CPG_ERR_TRY_AGAIN) {
- QPID_LOG(notice, "Waiting for CPG initialization.");
- while (CPG_ERR_TRY_AGAIN == (err = cpg_initialize(&handle, &callbacks)))
- sys::sleep(5);
+ int retries = 6;
+ while (err == CPG_ERR_TRY_AGAIN && --retries) {
+ QPID_LOG(notice, "Re-trying CPG initialization.");
+ sys::sleep(5);
+ err = cpg_initialize(&handle, &callbacks);
}
check(err, "Failed to initialize CPG.");
check(cpg_context_set(handle, this), "Cannot set CPG context");
@@ -87,7 +90,6 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow
// windows then this needs to be refactored into
// qpid::sys::<platform>
IOHandle::impl->fd = getFd();
- QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle);
}
Cpg::~Cpg() {
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
new file mode 100644
index 0000000000..6132d52126
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ErrorCheck.h"
+#include "EventFrame.h"
+#include "ClusterMap.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+using namespace framing;
+using namespace framing::cluster;
+
+ErrorCheck::ErrorCheck(Cluster& c)
+ : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
+{}
+
+ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+ copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
+ return o;
+}
+
+void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+{
+ // Detected a local error, inform cluster and set error state.
+ assert(t != ERROR_TYPE_NONE); // Must be an error.
+ assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+ type = t;
+ unresolved = ms;
+ frameSeq = seq;
+ connection = &c;
+ QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
+ << " error " << frameSeq << " unresolved: " << unresolved);
+ mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
+}
+
+void ErrorCheck::delivered(const EventFrame& e) {
+ if (isUnresolved()) {
+ const ClusterErrorCheckBody* errorCheck =
+ dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod());
+ const ClusterConfigChangeBody* configChange =
+ dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+
+ if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ if (errorCheck->getType() < type) { // my error is worse than his
+ QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+ throw Exception("Aborted by local failure that did not occur on all replicas");
+ }
+ else { // his error is worse/same as mine.
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+ unresolved.erase(e.getMemberId());
+ checkResolved();
+ }
+ }
+ else {
+ frames.push_back(e); // Only drop matching errorCheck controls.
+ if (configChange) {
+ MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ MemberSet result;
+ set_intersection(members.begin(), members.end(),
+ unresolved.begin(), unresolved.end(),
+ inserter(result, result.begin()));
+ unresolved.swap(result);
+ checkResolved();
+ }
+ }
+ }
+ else
+ frames.push_back(e);
+}
+
+void ErrorCheck::checkResolved() {
+ if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
+ type = ERROR_TYPE_NONE;
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+ }
+ else
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+}
+
+EventFrame ErrorCheck::getNext() {
+ assert(canProcess());
+ EventFrame e(frames.front());
+ frames.pop_front();
+ return e;
+}
+
+bool ErrorCheck::canProcess() const {
+ return type == ERROR_TYPE_NONE && !frames.empty();
+}
+
+bool ErrorCheck::isUnresolved() const {
+ return type != ERROR_TYPE_NONE;
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
new file mode 100644
index 0000000000..97b5f2bffd
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
@@ -0,0 +1,80 @@
+#ifndef QPID_CLUSTER_ERRORCHECK_H
+#define QPID_CLUSTER_ERRORCHECK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/enum.h"
+#include <boost/function.hpp>
+#include <deque>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+class EventFrame;
+class ClusterMap;
+class Cluster;
+class Multicaster;
+class Connection;
+
+/**
+ * Error checking logic.
+ *
+ * When an error occurs stop processing frames and queue them until we
+ * can determine if all nodes experienced the error. If not, we shut down.
+ */
+class ErrorCheck
+{
+ public:
+ typedef std::set<MemberId> MemberSet;
+ typedef framing::cluster::ErrorType ErrorType;
+
+ ErrorCheck(Cluster&);
+
+ /** A local error has occured */
+ void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+
+ /** Called when a frame is delivered */
+ void delivered(const EventFrame&);
+
+ EventFrame getNext();
+
+ bool canProcess() const;
+ bool isUnresolved() const;
+
+ private:
+ void checkResolved();
+
+ Cluster& cluster;
+ Multicaster& mcast;
+ std::deque<EventFrame> frames;
+ std::set<MemberId> unresolved;
+ uint64_t frameSeq;
+ ErrorType type;
+ Connection* connection;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_ERRORCHECK_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h
index e05ad60bcf..f0e445a08c 100644
--- a/qpid/cpp/src/qpid/cluster/Event.h
+++ b/qpid/cpp/src/qpid/cluster/Event.h
@@ -25,7 +25,6 @@
#include "types.h"
#include "qpid/RefCountedBuffer.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/LatencyMetric.h"
#include <sys/uio.h> // For iovec
#include <iosfwd>
@@ -42,7 +41,7 @@ class Buffer;
namespace cluster {
/** Header data for a multicast event */
-class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
+class EventHeader {
public:
EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
void decode(const MemberId& m, framing::Buffer&);
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
index 9350c801f5..a48d134f1b 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
@@ -28,9 +28,7 @@ EventFrame::EventFrame() {}
EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
: connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType())
-{
- QPID_LATENCY_INIT(frame);
-}
+{}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
if (e.frame.getBody()) o << e.frame;
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h
index d6ff58dd38..e275aac7aa 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.h
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.h
@@ -25,7 +25,6 @@
#include "types.h"
#include "Event.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/LatencyMetric.h"
#include <boost/intrusive_ptr.hpp>
#include <iosfwd>
@@ -45,6 +44,7 @@ struct EventFrame
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
bool isLastInEvent() const { return readCredit; }
+ MemberId getMemberId() const { return connectionId.getMember(); }
ConnectionId connectionId;
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index 409180c499..348963f901 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -50,6 +50,13 @@ void ExpiryPolicy::willExpire(broker::Message& m) {
timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
+void ExpiryPolicy::forget(broker::Message& m) {
+ MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+ assert(i != unexpiredByMessage.end());
+ unexpiredById.erase(i->second);
+ unexpiredByMessage.erase(i);
+}
+
bool ExpiryPolicy::hasExpired(broker::Message& m) {
return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
index 9f8b1a9236..c147e54796 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -49,8 +49,8 @@ class ExpiryPolicy : public broker::ExpiryPolicy
ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
void willExpire(broker::Message&);
-
bool hasExpired(broker::Message&);
+ void forget(broker::Message&);
// Send expiration notice to cluster.
void sendExpire(uint64_t);
diff --git a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
index 8b2f6dae8e..4df742d6c2 100644
--- a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
+++ b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
@@ -52,6 +52,8 @@ class LockedConnectionMap
return 0;
}
+ void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); }
+
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
mutable sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
index f0738ab08f..3b9d3ac990 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
@@ -22,7 +22,6 @@
#include "Multicaster.h"
#include "Cpg.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyMetric.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/AMQFrame.h"
@@ -64,7 +63,6 @@ void Multicaster::mcast(const Event& e) {
return;
}
}
- QPID_LATENCY_INIT(e);
queue.push(e);
}
@@ -73,7 +71,6 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
try {
PollableEventQueue::Queue::iterator i = values.begin();
while( i != values.end()) {
- QPID_LATENCY_RECORD("mcast send queue", *i);
iovec iov = i->toIovec();
if (!cpg.mcast(&iov, 1)) {
// cpg didn't send because of CPG flow control.
@@ -97,9 +94,4 @@ void Multicaster::release() {
holdingQueue.clear();
}
-void Multicaster::selfDeliver(const Event& e) {
- sys::Mutex::ScopedLock l(lock);
- QPID_LATENCY_RECORD("cpg self deliver", e);
-}
-
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h
index d1c3115977..baa5b87f38 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.h
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.h
@@ -55,8 +55,6 @@ class Multicaster
void mcast(const Event& e);
/** End holding mode, held events are mcast */
void release();
- /** Call when events are self-delivered to manage flow control. */
- void selfDeliver(const Event&);
private:
typedef sys::PollableQueue<Event> PollableEventQueue;
diff --git a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
index 74a376a657..6a30bddf06 100644
--- a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
+++ b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
@@ -30,8 +30,7 @@ namespace framing { class AMQFrame; }
namespace cluster {
/**
- * Output handler for frames sent to noop connections.
- * Simply discards frames.
+ * Output handler shadow connections, simply discards frames.
*/
class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
{
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index cd42446016..6af114a662 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -32,8 +32,9 @@ namespace cluster {
using namespace framing;
-OutputInterceptor::OutputInterceptor(
- cluster::Connection& p, sys::ConnectionOutputHandler& h)
+NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
+
+OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
: parent(p), closing(false), next(&h), sent(),
writeEstimate(p.getCluster().getWriteEstimate()),
moreOutput(), doingOutput()
@@ -47,7 +48,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
}
if (!parent.isCatchUp())
sent += f.encodedSize();
- QPID_LATENCY_RECORD("up to write queue", f);
}
void OutputInterceptor::activateOutput() {
@@ -98,7 +98,6 @@ void OutputInterceptor::deliverDoOutput(size_t requested) {
// Send a doOutput request if one is not already in flight.
void OutputInterceptor::sendDoOutput() {
if (!parent.isLocal()) return;
- QPID_LATENCY_INIT(*this);
doingOutput = true;
size_t request = writeEstimate.sending(getBuffered());
@@ -111,10 +110,10 @@ void OutputInterceptor::sendDoOutput() {
QPID_LOG(trace, parent << "Send doOutput request for " << request);
}
-void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) {
+void OutputInterceptor::closeOutput() {
sys::Mutex::ScopedLock l(lock);
closing = true;
- next = &h;
+ next = &discardHandler;
}
void OutputInterceptor::close() {
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
index c080a419e1..61e246bb89 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -23,9 +23,9 @@
*/
#include "WriteEstimate.h"
+#include "NoOpConnectionOutputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/broker/ConnectionFactory.h"
-#include "qpid/sys/LatencyMetric.h"
#include <boost/function.hpp>
namespace qpid {
@@ -37,7 +37,7 @@ class Connection;
/**
* Interceptor for connection OutputHandler, manages outgoing message replication.
*/
-class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetricTimestamp {
+class OutputInterceptor : public sys::ConnectionOutputHandler {
public:
OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h);
@@ -53,7 +53,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri
// Intercept doOutput requests on Connection.
bool doOutput();
- void closeOutput(sys::ConnectionOutputHandler& h);
+ void closeOutput();
cluster::Connection& parent;
@@ -70,6 +70,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri
WriteEstimate writeEstimate;
bool moreOutput;
bool doingOutput;
+ static NoOpConnectionOutputHandler discardHandler;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 97eae7efa3..bb4df8890a 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -26,6 +26,9 @@
#include "ExpiryPolicy.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
@@ -98,10 +101,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
expiry(expiry_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
-{
- connection.open(url, cs);
- session = connection.newSession(UPDATE);
-}
+{}
UpdateClient::~UpdateClient() {}
@@ -110,6 +110,8 @@ const std::string UpdateClient::UPDATE("qpid.cluster-update");
void UpdateClient::run() {
try {
+ connection.open(updateeUrl, connectionSettings);
+ session = connection.newSession(UPDATE);
update();
done();
} catch (const std::exception& e) {
@@ -126,15 +128,19 @@ void UpdateClient::update() {
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
- session.close();
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ session.queueDelete(arg::queue=UPDATE);
+ session.close();
+
+
ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
+
connection.close();
QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
}
@@ -203,7 +209,6 @@ class MessageUpdater {
sb.get()->send(transfer, message.payload->getFrames());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
-
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 23d061b7e4..96e2479955 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -24,6 +24,7 @@
#include "ClusterMap.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h
index 34319e7ed4..2f0a01d6ca 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.h
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.h
@@ -26,7 +26,6 @@
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
#include "ProtocolVersion.h"
-#include "qpid/sys/LatencyMetric.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/cast.hpp>
#include "qpid/CommonImportExport.h"
@@ -34,7 +33,7 @@
namespace qpid {
namespace framing {
-class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
+class AMQFrame : public AMQDataBlock
{
public:
QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0);
diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp b/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
deleted file mode 100644
index caa221def4..0000000000
--- a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifdef QPID_LATENCY_METRIC
-
-#include "LatencyMetric.h"
-#include "Time.h"
-#include <iostream>
-
-namespace qpid {
-namespace sys {
-
-void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) {
- const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now());
-}
-
-void LatencyMetricTimestamp::clear(const LatencyMetricTimestamp& ts) {
- const_cast<int64_t&>(ts.latency_metric_timestamp) = 0;
-}
-
-LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) :
- message(msg), count(0), total(0), skipped(0), skip(skip_)
-{}
-
-LatencyMetric::~LatencyMetric() { report(); }
-
-void LatencyMetric::record(const LatencyMetricTimestamp& start) {
- if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps.
- if (skip) {
- if (++skipped < skip) return;
- else skipped = 0;
- }
- ++count;
- int64_t now_ = Duration(now());
- total += now_ - start.latency_metric_timestamp;
- // Set start time for next leg of the journey
- const_cast<int64_t&>(start.latency_metric_timestamp) = now_;
-}
-
-void LatencyMetric::report() {
- using namespace std;
- if (count) {
- cout << "LATENCY: " << message << ": "
- << total / (count * TIME_USEC) << " microseconds" << endl;
- }
- else {
- cout << "LATENCY: " << message << ": no data." << endl;
- }
- count = 0;
- total = 0;
-}
-
-
-}} // namespace qpid::sys
-
-#endif
diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.h b/qpid/cpp/src/qpid/sys/LatencyMetric.h
deleted file mode 100644
index 63b5020db4..0000000000
--- a/qpid/cpp/src/qpid/sys/LatencyMetric.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef QPID_SYS_LATENCYMETRIC_H
-#define QPID_SYS_LATENCYMETRIC_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifdef QPID_LATENCY_METRIC
-
-#include "qpid/sys/IntegerTypes.h"
-
-namespace qpid {
-namespace sys {
-
-/** Use this base class to add a timestamp for latency to an object */
-struct LatencyMetricTimestamp {
- LatencyMetricTimestamp() : latency_metric_timestamp(0) {}
- static void initialize(const LatencyMetricTimestamp&);
- static void clear(const LatencyMetricTimestamp&);
- int64_t latency_metric_timestamp;
-};
-
-/**
- * Record average latencies, report on destruction.
- *
- * For debugging only, use via macros below so it can be compiled out
- * of production code.
- */
-class LatencyMetric {
- public:
- /** msg should be a string literal. */
- LatencyMetric(const char* msg, int64_t skip_=0);
- ~LatencyMetric();
-
- void record(const LatencyMetricTimestamp& start);
-
- private:
- void report();
- const char* message;
- int64_t count, total, skipped, skip;
-};
-
-}} // namespace qpid::sys
-
-#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x)
-#define QPID_LATENCY_CLEAR(x) ::qpid::sys::LatencyMetricTimestamp::clear(x)
-#define QPID_LATENCY_RECORD(msg, x) do { \
- static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \
- } while (false)
-#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) do { \
- static ::qpid::sys::LatencyMetric metric__(msg, skip); metric__.record(x); \
- } while (false)
-
-
-#else /* defined QPID_LATENCY_METRIC */
-
-namespace qpid { namespace sys {
-class LatencyMetricTimestamp {};
-}}
-
-#define QPID_LATENCY_INIT(x) (void)x
-#define QPID_LATENCY_CLEAR(x) (void)x
-#define QPID_LATENCY_RECORD(msg, x) (void)x
-#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) (void)x
-
-#endif /* defined QPID_LATENCY_METRIC */
-
-#endif /*!QPID_SYS_LATENCYMETRIC_H*/
diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
index 067c0fe6b7..cd89f39292 100755
--- a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
+++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -23,6 +23,7 @@
*/
#include "AsynchIoResult.h"
+#include "qpid/CommonImportExport.h"
#include <winsock2.h>
@@ -49,7 +50,7 @@ public:
AsynchIO::RequestCallback cbRequest;
};
-SOCKET toFd(const IOHandlePrivate* h);
+QPID_COMMON_EXTERN SOCKET toFd(const IOHandlePrivate* h);
}}
diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp
index e33b2dc35d..41423d8245 100644
--- a/qpid/cpp/src/tests/AsyncCompletion.cpp
+++ b/qpid/cpp/src/tests/AsyncCompletion.cpp
@@ -24,6 +24,8 @@
#include "qpid/sys/BlockingQueue.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/sys/Time.h"
+#include "qpid/framing/QueueQueryResult.h"
+#include "qpid/client/TypedResult.h"
using namespace std;
using namespace qpid;
@@ -97,4 +99,15 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete) {
sync.wait(); // Should complete now, all messages are completed.
}
+QPID_AUTO_TEST_CASE(testGetResult) {
+ SessionFixture fix;
+ AsyncSession s = fix.session;
+
+ s.queueDeclare("q", arg::durable=true);
+ TypedResult<QueueQueryResult> tr = s.queueQuery("q");
+ QueueQueryResult qq = tr.get();
+ BOOST_CHECK_EQUAL(qq.getQueue(), "q");
+ BOOST_CHECK_EQUAL(qq.getMessageCount(), 0U);
+}
+
QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/BasicP2PTest.cpp b/qpid/cpp/src/tests/BasicP2PTest.cpp
deleted file mode 100644
index f4a4cce7f2..0000000000
--- a/qpid/cpp/src/tests/BasicP2PTest.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "BasicP2PTest.h"
-
-using namespace qpid;
-using namespace qpid::client;
-
-class BasicP2PTest::Receiver : public Worker, public MessageListener
-{
- const std::string queue;
- std::string tag;
-public:
- Receiver(ConnectionOptions& options, const std::string& _queue, const int _messages)
- : Worker(options, _messages), queue(_queue){}
- void init()
- {
- Queue q(queue, true);
- channel.declareQueue(q);
- framing::FieldTable args;
- channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, q, queue, args);
- channel.consume(q, tag, this);
- channel.start();
- }
-
- void start()
- {
- }
-
- void received(Message&)
- {
- count++;
- }
-};
-
-void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options)
-{
- std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME");
- int messages = params.getInt("P2P_NUM_MESSAGES");
- if (role == "SENDER") {
- worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages));
- } else if(role == "RECEIVER"){
- worker = std::auto_ptr<Worker>(new Receiver(options, queue, messages));
- } else {
- throw Exception("unrecognised role");
- }
- worker->init();
-}
diff --git a/qpid/cpp/src/tests/BasicPubSubTest.cpp b/qpid/cpp/src/tests/BasicPubSubTest.cpp
deleted file mode 100644
index 1e9ff364f1..0000000000
--- a/qpid/cpp/src/tests/BasicPubSubTest.cpp
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "BasicPubSubTest.h"
-
-using namespace qpid;
-
-class BasicPubSubTest::Receiver : public Worker, public MessageListener
-{
- const Exchange& exchange;
- const std::string queue;
- const std::string key;
- std::string tag;
-public:
- Receiver(ConnectionOptions& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages)
- : Worker(options, _messages), exchange(_exchange), queue(_queue), key(_key){}
-
- void init()
- {
- Queue q(queue, true);
- channel.declareQueue(q);
- framing::FieldTable args;
- channel.bind(exchange, q, key, args);
- channel.consume(q, tag, this);
- channel.start();
- }
-
- void start(){
- }
-
- void received(Message&)
- {
- count++;
- }
-};
-
-class BasicPubSubTest::MultiReceiver : public Worker, public MessageListener
-{
- typedef boost::ptr_vector<Receiver> ReceiverList;
- ReceiverList receivers;
-
-public:
- MultiReceiver(ConnectionOptions& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount)
- : Worker(options, _messages)
- {
- for (int i = 0; i != receiverCount; i++) {
- std::string queue = (boost::format("%1%_%2%") % options.clientid % i).str();
- receivers.push_back(new Receiver(options, exchange, queue, key, _messages));
- }
- }
-
- void init()
- {
- for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
- receivers[i].init();
- }
- }
-
- void start()
- {
- for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
- receivers[i].start();
- }
- }
-
- void received(Message& msg)
- {
- for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
- receivers[i].received(msg);
- }
- }
-
- virtual int getCount()
- {
- count = 0;
- for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
- count += receivers[i].getCount();
- }
- return count;
- }
- virtual void stop()
- {
- for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
- receivers[i].stop();
- }
- }
-};
-
-void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options)
-{
- std::string key = params.getString("PUBSUB_KEY");
- int messages = params.getInt("PUBSUB_NUM_MESSAGES");
- int receivers = params.getInt("PUBSUB_NUM_RECEIVERS");
- if (role == "SENDER") {
- worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages));
- } else if(role == "RECEIVER"){
- worker = std::auto_ptr<Worker>(new MultiReceiver(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages, receivers));
- } else {
- throw Exception("unrecognised role");
- }
- worker->init();
-}
-
diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h
index f55560739d..b32b7f44ba 100644
--- a/qpid/cpp/src/tests/BrokerFixture.h
+++ b/qpid/cpp/src/tests/BrokerFixture.h
@@ -114,10 +114,12 @@ struct ClientT {
SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
- ClientT(uint16_t port, const std::string& name=std::string())
- : connection(port), session(connection.newSession(name)), subs(session) {}
- ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string())
- : connection(settings), session(connection.newSession(name)), subs(session) {}
+ std::string name;
+
+ ClientT(uint16_t port, const std::string& name_=std::string())
+ : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {}
+ ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
+ : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
~ClientT() { connection.close(); }
};
diff --git a/qpid/cpp/src/tests/ClientMessageTest.cpp b/qpid/cpp/src/tests/ClientMessageTest.cpp
new file mode 100644
index 0000000000..bc0945674f
--- /dev/null
+++ b/qpid/cpp/src/tests/ClientMessageTest.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+/**@file Unit tests for the client::Message class. */
+
+#include "unit_test.h"
+#include "qpid/client/Message.h"
+
+using namespace qpid::client;
+
+QPID_AUTO_TEST_SUITE(ClientMessageTestSuite)
+
+QPID_AUTO_TEST_CASE(MessageCopyAssign) {
+ // Verify that message has normal copy semantics.
+ Message m("foo");
+ BOOST_CHECK_EQUAL("foo", m.getData());
+ Message c(m);
+ BOOST_CHECK_EQUAL("foo", c.getData());
+ Message a;
+ BOOST_CHECK_EQUAL("", a.getData());
+ a = m;
+ BOOST_CHECK_EQUAL("foo", a.getData());
+ a.setData("a");
+ BOOST_CHECK_EQUAL("a", a.getData());
+ c.setData("c");
+ BOOST_CHECK_EQUAL("c", c.getData());
+ BOOST_CHECK_EQUAL("foo", m.getData());
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 589e1154e1..1c719d16dc 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -28,7 +28,7 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Time.h"
#include "qpid/client/Session.h"
-#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Message.h"
#include "qpid/framing/reply_exceptions.h"
#include <boost/optional.hpp>
@@ -121,7 +121,7 @@ QPID_AUTO_TEST_CASE(testDispatcher)
fix.session =fix.connection.newSession();
size_t count = 100;
for (size_t i = 0; i < count; ++i)
- fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
+ fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue"));
DummyListener listener(fix.session, "my-queue", count);
listener.run();
BOOST_CHECK_EQUAL(count, listener.messages.size());
@@ -137,7 +137,7 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
DummyListener listener(fix.session, "my-queue", count);
sys::Thread t(listener);
for (size_t i = 0; i < count; ++i) {
- fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
+ fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue"));
}
t.join();
BOOST_CHECK_EQUAL(count, listener.messages.size());
@@ -173,7 +173,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
fix.session.suspend();
// Make sure we are still subscribed after resume.
fix.connection.resume(fix.session);
- fix.session.messageTransfer(arg::content=TransferContent("my-message", "my-queue"));
+ fix.session.messageTransfer(arg::content=Message("my-message", "my-queue"));
FrameSet::shared_ptr msg = fix.session.get();
BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
}
diff --git a/qpid/cpp/src/tests/ClusterFailover.cpp b/qpid/cpp/src/tests/ClusterFailover.cpp
new file mode 100644
index 0000000000..db2392b296
--- /dev/null
+++ b/qpid/cpp/src/tests/ClusterFailover.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file Tests for partial failure in a cluster.
+ * Partial failure means some nodes experience a failure while others do not.
+ * In this case the failed nodes must shut down.
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ClusterFixture.h"
+#include "qpid/client/FailoverManager.h"
+#include <boost/assign.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+
+QPID_AUTO_TEST_SUITE(ClusterFailoverTestSuite)
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace qpid::client::arg;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
+
+
+// Test re-connecting with same session name after a failure.
+QPID_AUTO_TEST_CASE(testReconnectSameSessionName) {
+ ClusterFixture cluster(2, -1);
+ Client c0(cluster[0], "foo");
+ cluster.kill(0, 9);
+ Client c1(cluster[1], "foo"); // Using same name, should be cleaned up.
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/ClusterFixture.cpp b/qpid/cpp/src/tests/ClusterFixture.cpp
index 5658957b48..70d60b10b4 100644
--- a/qpid/cpp/src/tests/ClusterFixture.cpp
+++ b/qpid/cpp/src/tests/ClusterFixture.cpp
@@ -61,8 +61,14 @@ using boost::assign::list_of;
#include "ClusterFixture.h"
-ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_)
- : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_)
+ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_, const string& clusterLib_)
+ : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_), clusterLib(clusterLib_)
+{
+ add(n);
+}
+
+ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_, const string& clusterLib_)
+ : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_), clusterLib(clusterLib_)
{
add(n);
}
@@ -70,13 +76,14 @@ ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_)
const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
list_of<string>("--auth=no")("--no-data-dir");
-ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) {
- Args args = list_of<string>("qpidd " __FILE__)
+ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) {
+ Args args = list_of<string>("qpidd ")
("--no-module-dir")
- ("--load-module=../.libs/cluster.so")
- ("--cluster-name")(name)
+ ("--load-module")(clusterLib)
+ ("--cluster-name")(name)
("--log-prefix")(prefix);
args.insert(args.end(), userArgs.begin(), userArgs.end());
+ if (updateArgs) updateArgs(args, index);
return args;
}
@@ -84,7 +91,7 @@ void ClusterFixture::add() {
if (size() != size_t(localIndex)) { // fork a broker process.
std::ostringstream os; os << "fork" << size();
std::string prefix = os.str();
- forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix))));
+ forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size()))));
push_back(forkedBrokers.back()->getPort());
}
else { // Run in this process
@@ -106,7 +113,7 @@ void ClusterFixture::addLocal() {
assert(int(size()) == localIndex);
ostringstream os; os << "local" << localIndex;
string prefix = os.str();
- Args args(makeArgs(prefix));
+ Args args(makeArgs(prefix, localIndex));
vector<const char*> argv(args.size());
transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
qpid::log::Logger::instance().setPrefix(prefix);
@@ -116,7 +123,7 @@ void ClusterFixture::addLocal() {
}
bool ClusterFixture::hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); }
-
+
/** Kill a forked broker with sig, or shutdown localBroker if n==0. */
void ClusterFixture::kill(size_t n, int sig) {
if (n == size_t(localIndex))
@@ -131,3 +138,22 @@ void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig)
kill(n,sig);
try { c.close(); } catch(...) {}
}
+
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
+ std::vector<qpid::Url> urls = source.getKnownBrokers();
+ if (n >= 0 && unsigned(n) != urls.size()) {
+ // Retry up to 10 secs in .1 second intervals.
+ for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
+ qpid::sys::usleep(1000*100); // 0.1 secs
+ urls = source.getKnownBrokers();
+ }
+ }
+ std::set<int> s;
+ for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ s.insert((*i)[0].get<qpid::TcpAddress>()->port);
+ return s;
+}
diff --git a/qpid/cpp/src/tests/ClusterFixture.h b/qpid/cpp/src/tests/ClusterFixture.h
index 84fb9f2202..353ec0c88d 100644
--- a/qpid/cpp/src/tests/ClusterFixture.h
+++ b/qpid/cpp/src/tests/ClusterFixture.h
@@ -38,6 +38,7 @@
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
+#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
@@ -59,43 +60,55 @@ using qpid::broker::Broker;
using boost::shared_ptr;
using qpid::cluster::Cluster;
-
+#define DEFAULT_CLUSTER_LIB "../.libs/cluster.so"
/** Cluster fixture is a vector of ports for the replicas.
- *
+ *
* At most one replica (by default replica 0) is in the current
* process, all others are forked as children.
*/
class ClusterFixture : public vector<uint16_t> {
public:
typedef std::vector<std::string> Args;
+ static const Args DEFAULT_ARGS;
+
/** @param localIndex can be -1 meaning don't automatically start a local broker.
* A local broker can be started with addLocal().
*/
- ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS);
+ ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS, const string& clusterLib = DEFAULT_CLUSTER_LIB);
+
+ /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */
+ ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs, const string& clusterLib = DEFAULT_CLUSTER_LIB);
+
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add(); // Add a broker.
void setup();
bool hasLocal() const;
-
- /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
+
+ /** Kill a forked broker with sig, or shutdown localBroker. */
void kill(size_t n, int sig=SIGINT);
/** Kill a broker and suppressing errors from closing connection c. */
void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT);
private:
- static const Args DEFAULT_ARGS;
-
+
void addLocal(); // Add a local broker.
- Args makeArgs(const std::string& prefix);
+ Args makeArgs(const std::string& prefix, size_t index);
string name;
std::auto_ptr<BrokerFixture> localBroker;
int localIndex;
std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
Args userArgs;
+ boost::function<void (Args&, size_t)> updateArgs;
+ string clusterLib;
};
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1);
#endif /*!CLUSTER_FIXTURE_H*/
diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp
index f90f76aeb2..12175d3287 100644
--- a/qpid/cpp/src/tests/ForkedBroker.cpp
+++ b/qpid/cpp/src/tests/ForkedBroker.cpp
@@ -20,20 +20,39 @@
*/
#include "ForkedBroker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/algorithm/string.hpp>
#include <algorithm>
#include <stdlib.h>
#include <sys/types.h>
#include <signal.h>
-ForkedBroker::ForkedBroker(const Args& args) { init(args); }
+using namespace std;
+using qpid::ErrnoException;
-ForkedBroker::ForkedBroker(int argc, const char* const argv[]) { init(Args(argv, argc+argv)); }
+ForkedBroker::ForkedBroker(const Args& constArgs) {
+ Args args(constArgs);
+ Args::iterator i = find(args.begin(), args.end(), string("TMP_DATA_DIR"));
+ if (i != args.end()) {
+ args.erase(i);
+ char dd[] = "/tmp/ForkedBroker.XXXXXX";
+ if (!mkdtemp(dd))
+ throw qpid::ErrnoException("Can't create data dir");
+ dataDir = dd;
+ args.push_back("--data-dir");
+ args.push_back(dataDir);
+ }
+ init(args);
+}
ForkedBroker::~ForkedBroker() {
- try { kill(); } catch(const std::exception& e) {
+ try { kill(); }
+ catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what()));
}
+ if (!dataDir.empty())
+ ::system(("rm -rf "+dataDir).c_str());
}
void ForkedBroker::kill(int sig) {
@@ -42,14 +61,25 @@ void ForkedBroker::kill(int sig) {
pid = 0; // Reset pid here in case of an exception.
using qpid::ErrnoException;
if (::kill(savePid, sig) < 0)
- throw ErrnoException("kill failed");
+ throw ErrnoException("kill failed");
int status;
if (::waitpid(savePid, &status, 0) < 0 && sig != 9)
throw ErrnoException("wait for forked process failed");
if (WEXITSTATUS(status) != 0 && sig != 9)
throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status)));
}
+
+namespace std {
+static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) {
+ copy(a.begin(), a.end(), ostream_iterator<string>(o, " "));
+ return o;
+}
+bool isLogOption(const std::string& s) {
+ return boost::starts_with(s, "--log-enable") || boost::starts_with(s, "--trace");
+}
+
+}
void ForkedBroker::init(const Args& userArgs) {
using qpid::ErrnoException;
@@ -70,17 +100,20 @@ void ForkedBroker::init(const Args& userArgs) {
}
else { // child
::close(pipeFds[0]);
- // FIXME aconway 2009-02-12:
int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent.
if (fd < 0) throw ErrnoException("dup2 failed");
- const char* prog = "../qpidd";
+ const char* prog = ::getenv("QPID_FORKED_BROKER");
+ if (!prog) prog = "../qpidd";
Args args(userArgs);
args.push_back("--port=0");
- if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE"))
- args.push_back("--log-enable=error+"); // Keep quiet except for errors.
+ // Keep quiet except for errors.
+ if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")
+ && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end())
+ args.push_back("--log-enable=error+");
std::vector<const char*> argv(args.size());
std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
argv.push_back(0);
+ QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args);
execv(prog, const_cast<char* const*>(&argv[0]));
throw ErrnoException("execv failed");
}
diff --git a/qpid/cpp/src/tests/ForkedBroker.h b/qpid/cpp/src/tests/ForkedBroker.h
index 6f97fbdc09..45b522068c 100644
--- a/qpid/cpp/src/tests/ForkedBroker.h
+++ b/qpid/cpp/src/tests/ForkedBroker.h
@@ -48,19 +48,26 @@ class ForkedBroker {
public:
typedef std::vector<std::string> Args;
+ // argv args are passed to broker.
+ //
+ // Special value "TMP_DATA_DIR" is substituted with a temporary
+ // data directory for the broker.
+ //
ForkedBroker(const Args& argv);
- ForkedBroker(int argc, const char* const argv[]);
~ForkedBroker();
void kill(int sig=SIGINT);
+ int wait(); // Wait for exit, return exit status.
uint16_t getPort() { return port; }
pid_t getPID() { return pid; }
private:
+
void init(const Args& args);
pid_t pid;
int port;
+ std::string dataDir;
};
#endif /*!TESTS_FORKEDBROKER_H*/
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 9a81ef18b3..161428fcad 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -96,7 +96,9 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
RetryList.cpp \
RateFlowcontrolTest.cpp \
FrameDecoder.cpp \
- ReplicationTest.cpp
+ ReplicationTest.cpp \
+ ClientMessageTest.cpp \
+ PollableCondition.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -110,11 +112,17 @@ endif
# amqp_0_10/Map.cpp \
# amqp_0_10/handlers.cpp
+TESTLIBFLAGS = -module -rpath $(abs_builddir)
check_LTLIBRARIES += libshlibtest.la
-libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)
+libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS)
libshlibtest_la_SOURCES = shlibtest.cpp
+check_LTLIBRARIES += test_store.la
+test_store_la_SOURCES = test_store.cpp
+test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required?
+test_store_la_LDFLAGS = $(TESTLIBFLAGS)
+
include cluster.mk
if SSL
include ssl.mk
@@ -236,24 +244,6 @@ libdlclose_noop_la_SOURCES = dlclose_noop.c
CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
-# FIXME aconway 2008-05-23: Disabled interop_runner because it uses
-# the obsolete Channel class. Convert to Session and re-enable.
-#
-# check_PROGRAMS += interop_runner
-
-# interop_runner_SOURCES = \
-# interop_runner.cpp \
-# SimpleTestCaseBase.cpp \
-# BasicP2PTest.cpp \
-# BasicPubSubTest.cpp \
-# SimpleTestCaseBase.h \
-# BasicP2PTest.h \
-# BasicPubSubTest.h \
-# TestCase.h \
-# TestOptions.h ConnectionOptions.h
-# interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs)
-
-
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
diff --git a/qpid/cpp/src/tests/PartialFailure.cpp b/qpid/cpp/src/tests/PartialFailure.cpp
new file mode 100644
index 0000000000..5137672e7d
--- /dev/null
+++ b/qpid/cpp/src/tests/PartialFailure.cpp
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file Tests for partial failure in a cluster.
+ * Partial failure means some nodes experience a failure while others do not.
+ * In this case the failed nodes must shut down.
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ClusterFixture.h"
+#include <boost/assign.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+
+QPID_AUTO_TEST_SUITE(PartialFailureTestSuite)
+
+ using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace qpid::client::arg;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
+
+static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); }
+
+void updateArgs(ClusterFixture::Args& args, size_t index) {
+ ostringstream os;
+ os << "--test-store-name=s" << index;
+ args.push_back(os.str());
+ args.push_back("--load-module=.libs/test_store.so");
+ args.push_back("--auth=no");
+ args.push_back("TMP_DATA_DIR");
+
+ // These tests generate errors deliberately, disable error logging unless a log env var is set.
+ if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) {
+ remove_if(args.begin(), args.end(), isLogOption);
+ args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs.
+ }
+}
+
+Message pMessage(string data, string q) {
+ Message msg(data, q);
+ msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+ return msg;
+}
+
+void queueAndSub(Client& c) {
+ c.session.queueDeclare(c.name, durable=true);
+ c.subs.subscribe(c.lq, c.name);
+}
+
+// Verify normal cluster-wide errors.
+QPID_AUTO_TEST_CASE(testNormalErrors) {
+ // FIXME aconway 2009-04-10: Would like to put a scope just around
+ // the statements expected to fail (in BOOST_CHECK_THROW) but that
+ // sproadically lets out messages, possibly because they're in
+ // Connection thread.
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(3, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+
+ queueAndSub(c0);
+ c0.session.messageTransfer(content=Message("x", "c0"));
+ BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
+
+ // Session error.
+ BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
+ c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
+
+ // Connection error, kill c1 on all members.
+ queueAndSub(c1);
+ BOOST_CHECK_THROW(
+ c1.session.messageTransfer(
+ content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
+ ConnectionException);
+ c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
+
+ BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
+ BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
+ BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+}
+
+
+// Test errors after a new member joins to verify frame-sequence-numbers are ok in update.
+QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(1, -1, updateArgs);
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Kill the new guy
+ cluster.add();
+ Client c1(cluster[1]);
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
+ BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+ // Kill the old guy
+ cluster.add();
+ Client c2(cluster[2]);
+ c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
+ BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+}
+
+// Test that if one member fails and others do not, the failure leaves the cluster.
+QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(3, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+ // Cause partial failure on c1
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
+ BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("b", "q"));
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+ // Cause partial failure on c2
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
+ BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("c", "q"));
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+}
+
+// Test multiple partial falures: 2 fail 2 pass
+QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(4, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+ Client c3(cluster[3], "c3");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Cause partial failure on c1, c2
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
+ BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+ BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("b", "q"));
+ c3.session.messageTransfer(content=pMessage("c", "q"));
+ BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+}
+
+/** FIXME aconway 2009-04-10:
+ * The current approach to shutting down a process in test_store
+ * sometimes leads to assertion failures and errors in the shut-down
+ * process. Need a cleaner solution
+ */
+#if 0
+QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(2, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Cause failure on member 0 and simultaneous crash on member 1.
+ BOOST_CHECK_THROW(
+ c0.session.messageTransfer(
+ content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
+ ConnectionException);
+ cluster.wait(1);
+
+ Client c00(cluster[0], "c00"); // Old connection is dead.
+ BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+}
+#endif
+
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/PollableCondition.cpp b/qpid/cpp/src/tests/PollableCondition.cpp
new file mode 100644
index 0000000000..33664d43fc
--- /dev/null
+++ b/qpid/cpp/src/tests/PollableCondition.cpp
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/PollableCondition.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
+#include <boost/bind.hpp>
+
+
+QPID_AUTO_TEST_SUITE(PollableConditionTest)
+
+using namespace qpid::sys;
+
+const Duration SHORT = TIME_SEC/100;
+const Duration LONG = TIME_SEC/10;
+
+class Callback {
+ public:
+ enum Action { NONE, DISARM, CLEAR, DISARM_CLEAR };
+
+ Callback() : count(), action(NONE) {}
+
+ void call(PollableCondition& pc) {
+ Mutex::ScopedLock l(lock);
+ ++count;
+ switch(action) {
+ case NONE: break;
+ case DISARM: pc.disarm(); break;
+ case CLEAR: pc.clear(); break;
+ case DISARM_CLEAR: pc.disarm(); pc.clear(); break;
+ }
+ action = NONE;
+ lock.notify();
+ }
+
+ bool isCalling() { Mutex::ScopedLock l(lock); return wait(LONG); }
+
+ bool isNotCalling() { Mutex::ScopedLock l(lock); return !wait(SHORT); }
+
+ bool nextCall(Action a=NONE) {
+ Mutex::ScopedLock l(lock);
+ action = a;
+ return wait(LONG);
+ }
+
+ private:
+ bool wait(Duration timeout) {
+ int n = count;
+ AbsTime deadline(now(), timeout);
+ while (n == count && lock.wait(deadline))
+ ;
+ return n != count;
+ }
+
+ Monitor lock;
+ int count;
+ Action action;
+};
+
+QPID_AUTO_TEST_CASE(testPollableCondition) {
+ boost::shared_ptr<Poller> poller(new Poller());
+ Callback callback;
+ PollableCondition pc(boost::bind(&Callback::call, &callback, _1), poller);
+
+ Thread runner = Thread(*poller);
+
+ BOOST_CHECK(callback.isNotCalling()); // condition is not set or armed.
+
+ pc.rearm();
+ BOOST_CHECK(callback.isNotCalling()); // Armed but not set
+
+ pc.set();
+ BOOST_CHECK(callback.isCalling()); // Armed and set.
+ BOOST_CHECK(callback.isCalling()); // Still armed and set.
+
+ callback.nextCall(Callback::DISARM);
+ BOOST_CHECK(callback.isNotCalling()); // set but not armed
+
+ pc.rearm();
+ BOOST_CHECK(callback.isCalling()); // Armed and set.
+ callback.nextCall(Callback::CLEAR);
+ BOOST_CHECK(callback.isNotCalling()); // armed but not set
+
+ pc.set();
+ BOOST_CHECK(callback.isCalling()); // Armed and set.
+ callback.nextCall(Callback::DISARM_CLEAR);
+ BOOST_CHECK(callback.isNotCalling()); // not armed or set.
+
+ poller->shutdown();
+ runner.join();
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+
diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp b/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
deleted file mode 100644
index 2739734731..0000000000
--- a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "SimpleTestCaseBase.h"
-
-using namespace qpid;
-
-void SimpleTestCaseBase::start()
-{
- if (worker.get()) {
- worker->start();
- }
-}
-
-void SimpleTestCaseBase::stop()
-{
- if (worker.get()) {
- worker->stop();
- }
-}
-
-void SimpleTestCaseBase::report(client::Message& report)
-{
- if (worker.get()) {
- report.getHeaders().setInt("MESSAGE_COUNT", worker->getCount());
- //add number of messages sent or received
- std::stringstream reportstr;
- reportstr << worker->getCount();
- report.setData(reportstr.str());
- }
-}
-
-SimpleTestCaseBase::Sender::Sender(ConnectionOptions& options,
- const Exchange& _exchange,
- const std::string& _key,
- const int _messages)
- : Worker(options, _messages), exchange(_exchange), key(_key) {}
-
-void SimpleTestCaseBase::Sender::init()
-{
- channel.start();
-}
-
-void SimpleTestCaseBase::Sender::start(){
- Message msg;
- while (count < messages) {
- channel.publish(msg, exchange, key);
- count++;
- }
- stop();
-}
-
-SimpleTestCaseBase::Worker::Worker(ConnectionOptions& options, const int _messages) :
- messages(_messages), count(0)
-{
- connection.open(options.host, options.port);
- connection.openChannel(channel);
-}
-
-void SimpleTestCaseBase::Worker::stop()
-{
- channel.close();
- connection.close();
-}
-
-int SimpleTestCaseBase::Worker::getCount()
-{
- return count;
-}
-
diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.h b/qpid/cpp/src/tests/SimpleTestCaseBase.h
deleted file mode 100644
index 0c1052d0c2..0000000000
--- a/qpid/cpp/src/tests/SimpleTestCaseBase.h
+++ /dev/null
@@ -1,89 +0,0 @@
-#ifndef _SimpleTestCaseBase_
-#define _SimpleTestCaseBase_
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <memory>
-#include <sstream>
-
-#include "qpid/Exception.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/Connection.h"
-#include "ConnectionOptions.h"
-#include "qpid/client/MessageListener.h"
-#include "TestCase.h"
-
-
-namespace qpid {
-
-using namespace qpid::client;
-
-class SimpleTestCaseBase : public TestCase
-{
-protected:
- class Worker
- {
- protected:
- client::Connection connection;
- client::Channel channel;
- const int messages;
- int count;
-
- public:
-
- Worker(ConnectionOptions& options, const int messages);
- virtual ~Worker(){}
-
- virtual void stop();
- virtual int getCount();
- virtual void init() = 0;
- virtual void start() = 0;
- };
-
- class Sender : public Worker
- {
- const Exchange& exchange;
- const std::string key;
- public:
- Sender(ConnectionOptions& options,
- const Exchange& exchange,
- const std::string& key,
- const int messages);
- void init();
- void start();
- };
-
- std::auto_ptr<Worker> worker;
-
-public:
- virtual void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options) = 0;
-
- virtual ~SimpleTestCaseBase() {}
-
- void start();
- void stop();
- void report(client::Message& report);
-};
-
-}
-
-#endif
diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h
index d2a93c902b..ccce3c8842 100644
--- a/qpid/cpp/src/tests/SocketProxy.h
+++ b/qpid/cpp/src/tests/SocketProxy.h
@@ -21,45 +21,58 @@
*
*/
+#include "qpid/sys/IOHandle.h"
+#ifdef _WIN32
+# include "qpid/sys/windows/IoHandlePrivate.h"
+ typedef SOCKET FdType;
+#else
+# include "qpid/sys/posix/PrivatePosix.h"
+ typedef int FdType;
+#endif
#include "qpid/sys/Socket.h"
-#include "qpid/sys/Poller.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/client/Connection.h"
#include "qpid/log/Statement.h"
-#include <algorithm>
-
/**
* A simple socket proxy that forwards to another socket.
* Used between client & local broker to simulate network failures.
*/
class SocketProxy : private qpid::sys::Runnable
{
+ // Need a Socket we can get the fd from
+ class LowSocket : public qpid::sys::Socket {
+ public:
+ FdType getFd() { return toFd(impl); }
+ };
+
public:
/** Connect to connectPort on host, start a forwarding thread.
* Listen for connection on getPort().
*/
SocketProxy(int connectPort, const std::string host="localhost")
- : closed(false), port(listener.listen()), dropClient(), dropServer()
+ : closed(false), joined(true),
+ port(listener.listen()), dropClient(), dropServer()
{
client.connect(host, connectPort);
+ joined = false;
thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
}
- ~SocketProxy() { close(); }
+ ~SocketProxy() { close(); if (!joined) thread.join(); }
/** Simulate a network disconnect. */
void close() {
{
qpid::sys::Mutex::ScopedLock l(lock);
- if (closed) return;
+ if (closed) { return; }
closed=true;
}
- poller.shutdown();
- if (thread.id() != qpid::sys::Thread::current().id())
- thread.join();
+ if (thread.id() != qpid::sys::Thread::current().id()) {
+ thread.join();
+ joined = true;
+ }
client.close();
}
@@ -85,56 +98,72 @@ class SocketProxy : private qpid::sys::Runnable
}
void run() {
- std::auto_ptr<qpid::sys::Socket> server;
+ std::auto_ptr<LowSocket> server;
try {
- qpid::sys::PollerHandle listenerHandle(listener);
- poller.addFd(listenerHandle, qpid::sys::Poller::INPUT);
- qpid::sys::Poller::Event event = poller.wait();
- throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
- throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed");
-
- poller.delFd(listenerHandle);
- server.reset(listener.accept());
-
- // Pump data between client & server sockets
- qpid::sys::PollerHandle clientHandle(client);
- qpid::sys::PollerHandle serverHandle(*server);
- poller.addFd(clientHandle, qpid::sys::Poller::INPUT);
- poller.addFd(serverHandle, qpid::sys::Poller::INPUT);
+ fd_set socks;
+ FdType maxFd = listener.getFd();
+ struct timeval tmo;
+ for (;;) {
+ FD_ZERO(&socks);
+ FD_SET(maxFd, &socks);
+ tmo.tv_sec = 0;
+ tmo.tv_usec = 500 * 1000;
+ if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ throwIf(closed, "SocketProxy: Closed by close()");
+ continue;
+ }
+ throwIf(!FD_ISSET(maxFd, &socks), "SocketProxy: Accept failed");
+ break; // Accept ready... go to next step
+ }
+ server.reset(reinterpret_cast<LowSocket *>(listener.accept()));
+ maxFd = server->getFd();
+ if (client.getFd() > maxFd)
+ maxFd = client.getFd();
char buffer[1024];
for (;;) {
- qpid::sys::Poller::Event event = poller.wait();
- throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
- throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected");
- if (event.handle == &serverHandle) {
+ FD_ZERO(&socks);
+ tmo.tv_sec = 0;
+ tmo.tv_usec = 500 * 1000;
+ FD_SET(client.getFd(), &socks);
+ FD_SET(server->getFd(), &socks);
+ if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ throwIf(closed, "SocketProxy: Closed by close()");
+ continue;
+ }
+ // Something is set; relay data as needed until something closes
+ if (FD_ISSET(server->getFd(), &socks)) {
ssize_t n = server->read(buffer, sizeof(buffer));
+ throwIf(n <= 0, "SocketProxy: server disconnected");
if (!dropServer) client.write(buffer, n);
- poller.rearmFd(serverHandle);
- } else if (event.handle == &clientHandle) {
+ }
+ if (FD_ISSET(client.getFd(), &socks)) {
ssize_t n = client.read(buffer, sizeof(buffer));
- if (!dropClient) server->write(buffer, n);
- poller.rearmFd(clientHandle);
- } else {
- throwIf(true, "SocketProxy: No handle ready");
+ throwIf(n <= 0, "SocketProxy: client disconnected");
+ if (!dropServer) server->write(buffer, n);
}
+ if (!FD_ISSET(client.getFd(), &socks) &&
+ !FD_ISSET(server->getFd(), &socks))
+ throwIf(true, "SocketProxy: No handle ready");
}
}
catch (const std::exception& e) {
QPID_LOG(debug, "SocketProxy::run exception: " << e.what());
}
try {
- if (server.get()) server->close();
- close();
- }
+ if (server.get()) server->close();
+ close();
+ }
catch (const std::exception& e) {
QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what());
}
}
mutable qpid::sys::Mutex lock;
- bool closed;
- qpid::sys::Poller poller;
- qpid::sys::Socket client, listener;
+ mutable bool closed;
+ bool joined;
+ LowSocket client, listener;
uint16_t port;
qpid::sys::Thread thread;
bool dropClient, dropServer;
diff --git a/qpid/cpp/src/tests/TestCase.h b/qpid/cpp/src/tests/TestCase.h
deleted file mode 100644
index ba3330c951..0000000000
--- a/qpid/cpp/src/tests/TestCase.h
+++ /dev/null
@@ -1,64 +0,0 @@
-#ifndef _TestCase_
-#define _TestCase_
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ConnectionOptions.h"
-#include "qpid/client/Message.h"
-
-
-namespace qpid {
-
-/**
- * Interface to be implemented by test cases for use with the test
- * runner.
- */
-class TestCase
-{
-public:
- /**
- * Directs the test case to act in a particular role. Some roles
- * may be 'activated' at this stage others may require an explicit
- * start request.
- */
- virtual void assign(const std::string& role, framing::FieldTable& params, client::ConnectionOptions& options) = 0;
- /**
- * Each test will be started on its own thread, which should block
- * until the test completes (this may or may not require an
- * explicit stop() request).
- */
- virtual void start() = 0;
- /**
- * Requests that the test be stopped if still running.
- */
- virtual void stop() = 0;
- /**
- * Allows the test to fill in details on the final report
- * message. Will be called only after start has returned.
- */
- virtual void report(client::Message& report) = 0;
-
- virtual ~TestCase() {}
-};
-
-}
-
-#endif
diff --git a/qpid/cpp/src/tests/XmlClientSessionTest.cpp b/qpid/cpp/src/tests/XmlClientSessionTest.cpp
index 98558f0a76..aeb13c292f 100644
--- a/qpid/cpp/src/tests/XmlClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/XmlClientSessionTest.cpp
@@ -26,7 +26,7 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Message.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/SubscriptionManager.h"
diff --git a/qpid/cpp/src/tests/client_test.cpp b/qpid/cpp/src/tests/client_test.cpp
index 204c2c4b71..05b42f620c 100644
--- a/qpid/cpp/src/tests/client_test.cpp
+++ b/qpid/cpp/src/tests/client_test.cpp
@@ -32,8 +32,8 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
#include "qpid/client/Session.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/all_method_bodies.h"
+#include "qpid/client/SubscriptionManager.h"
+
using namespace qpid;
using namespace qpid::client;
@@ -113,35 +113,18 @@ int main(int argc, char** argv)
session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1);
if (opts.verbose) print("Published message: ", msgOut);
- //subscribe to the queue, add sufficient credit and then get
- //incoming 'frameset', check that its a message transfer and
- //then convert it to a message (see Dispatcher and
- //SubscriptionManager utilties for common reusable patterns at
- //a higher level)
- session.messageSubscribe(arg::queue="MyQueue", arg::destination="MyId");
- session.messageFlow(arg::destination="MyId", arg::unit=0, arg::value=1); //credit for one message
- session.messageFlow(arg::destination="MyId", arg::unit=1, arg::value=0xFFFFFFFF); //credit for infinite bytes
- if (opts.verbose) std::cout << "Subscribed to queue." << std::endl;
- FrameSet::shared_ptr incoming = session.get();
- if (incoming->isA<MessageTransferBody>()) {
- Message msgIn(*incoming);
- if (msgIn.getData() == msgOut.getData()) {
- if (opts.verbose) std::cout << "Received the exepected message." << std::endl;
- session.messageAccept(SequenceSet(msgIn.getId()));
- session.markCompleted(msgIn.getId(), true, true);
- } else {
- print("Received an unexepected message: ", msgIn);
- }
- } else {
- throw Exception("Unexpected command received");
- }
-
+ // Using the SubscriptionManager, get the message from the queue.
+ SubscriptionManager subs(session);
+ Message msgIn = subs.get("MyQueue");
+ if (msgIn.getData() == msgOut.getData())
+ if (opts.verbose) std::cout << "Received the exepected message." << std::endl;
+
//close the session & connection
session.close();
if (opts.verbose) std::cout << "Closed session." << std::endl;
connection.close();
if (opts.verbose) std::cout << "Closed connection." << std::endl;
- return 0;
+ return 0;
} catch(const std::exception& e) {
std::cout << e.what() << std::endl;
}
diff --git a/qpid/cpp/src/tests/client_test.vcproj b/qpid/cpp/src/tests/client_test.vcproj
index 1623c2961e..32c90bce04 100644
--- a/qpid/cpp/src/tests/client_test.vcproj
+++ b/qpid/cpp/src/tests/client_test.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="client_test"
- ProjectGUID="{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="client_test"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 5d115de5a5..e5e803003a 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -34,8 +34,10 @@ EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_
federated_cluster_test clustered_replication_test
check_PROGRAMS+=cluster_test
-cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp
-cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework
+cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \
+ cluster_test.cpp PartialFailure.cpp ClusterFailover.cpp
+
+cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework
unit_test_LDADD+=../cluster.la
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index eee2df58cc..d38d84025b 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -35,6 +35,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Logger.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
@@ -73,7 +74,7 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4;
ostream& operator<<(ostream& o, const cpg_name* n) {
- return o << cluster::Cpg::str(*n);
+ return o << Cpg::str(*n);
}
ostream& operator<<(ostream& o, const cpg_address& a) {
@@ -89,29 +90,12 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-template <class C> set<uint16_t> makeSet(const C& c) {
- set<uint16_t> s;
+template <class C> set<int> makeSet(const C& c) {
+ set<int> s;
copy(c.begin(), c.end(), inserter(s, s.begin()));
return s;
}
-template <class T> set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
- vector<Url> urls = source.getKnownBrokers();
- if (n >= 0 && unsigned(n) != urls.size()) {
- BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
- // Retry up to 10 secs in .1 second intervals.
- for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
- sys::usleep(1000*100); // 0.1 secs
- urls = source.getKnownBrokers();
- }
- }
- BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
- set<uint16_t> s;
- for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
- s.insert((*i)[0].get<TcpAddress>()->port);
- return s;
-}
-
class Sender {
public:
Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
@@ -175,7 +159,6 @@ ConnectionSettings aclSettings(int port, const std::string& id) {
QPID_AUTO_TEST_CASE(testAcl) {
ofstream policyFile("cluster_test.acl");
- // FIXME aconway 2009-02-12: guest -> qpidd?
policyFile << "acl allow foo@QPID create queue name=foo" << endl
<< "acl allow foo@QPID create queue name=foo2" << endl
<< "acl deny foo@QPID create queue name=bar" << endl
@@ -446,13 +429,13 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
- set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
+ set<int> kb0 = knownBrokerPorts(c0.connection);
BOOST_CHECK_EQUAL(kb0.size(), 1u);
BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
cluster.add();
Client c1(cluster[1], "c1");
- set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
+ set<int> kb1 = knownBrokerPorts(c1.connection);
kb0 = knownBrokerPorts(c0.connection, 2);
BOOST_CHECK_EQUAL(kb1.size(), 2u);
BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
@@ -460,7 +443,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
cluster.add();
Client c2(cluster[2], "c2");
- set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
+ set<int> kb2 = knownBrokerPorts(c2.connection);
kb1 = knownBrokerPorts(c1.connection, 3);
kb0 = knownBrokerPorts(c0.connection, 3);
BOOST_CHECK_EQUAL(kb2.size(), 3u);
diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test
index 2a3e742632..7afda87733 100755
--- a/qpid/cpp/src/tests/clustered_replication_test
+++ b/qpid/cpp/src/tests/clustered_replication_test
@@ -23,6 +23,7 @@
# failures:
srcdir=`dirname $0`
PYTHON_DIR=$srcdir/../../../python
+export PYTHONPATH=$PYTHON_DIR
trap stop_brokers INT EXIT
diff --git a/qpid/cpp/src/tests/consume.vcproj b/qpid/cpp/src/tests/consume.vcproj
index 2e1f8e3e56..1fb72e2fb3 100644
--- a/qpid/cpp/src/tests/consume.vcproj
+++ b/qpid/cpp/src/tests/consume.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="consume"
- ProjectGUID="{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="consume"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/declare_queues.cpp b/qpid/cpp/src/tests/declare_queues.cpp
index 7f61bde12a..d17a72b940 100644
--- a/qpid/cpp/src/tests/declare_queues.cpp
+++ b/qpid/cpp/src/tests/declare_queues.cpp
@@ -33,14 +33,15 @@ using namespace std;
int main(int argc, char ** argv)
{
ConnectionSettings settings;
- if ( argc != 3 )
+ if ( argc != 4 )
{
- cerr << "Usage: declare_queues host port\n";
+ cerr << "Usage: declare_queues host port durability\n";
return 1;
}
settings.host = argv[1];
settings.port = atoi(argv[2]);
+ int durability = atoi(argv[3]);
FailoverManager connection(settings);
try {
@@ -48,7 +49,10 @@ int main(int argc, char ** argv)
while (!complete) {
Session session = connection.connect().newSession();
try {
- session.queueDeclare(arg::queue="message_queue");
+ if ( durability )
+ session.queueDeclare(arg::queue="message_queue", arg::durable=true);
+ else
+ session.queueDeclare(arg::queue="message_queue");
complete = true;
} catch (const qpid::TransportFailure&) {}
}
diff --git a/qpid/cpp/src/tests/echotest.vcproj b/qpid/cpp/src/tests/echotest.vcproj
index 2848894228..d41a0df4e6 100644
--- a/qpid/cpp/src/tests/echotest.vcproj
+++ b/qpid/cpp/src/tests/echotest.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="echotest"
- ProjectGUID="{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="echotest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp
index 4f16e469b8..c8f67aadd8 100644
--- a/qpid/cpp/src/tests/failover_soak.cpp
+++ b/qpid/cpp/src/tests/failover_soak.cpp
@@ -220,63 +220,13 @@ struct children : public vector<child *>
cout << "\n\n\n\n";
}
-
- /*
- Only call this if you already know there is at least
- one child still running. Supply a time in seconds.
- If it has been at least that long since a shild stopped
- running, we judge the system to have hung.
- */
- int
- hanging ( int hangTime )
- {
- struct timeval now,
- duration;
- gettimeofday ( &now, 0 );
-
- int how_many_hanging = 0;
-
- vector<child *>::iterator i;
- for ( i = begin(); i != end(); ++ i )
- {
- //Not in POSIX
- //timersub ( & now, &((*i)->startTime), & duration );
- duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec;
- duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec;
- if (duration.tv_usec < 0) {
- --duration.tv_sec;
- duration.tv_usec += 1000000;
- }
-
- if ( (COMPLETED != (*i)->status) // child isn't done running
- &&
- ( duration.tv_sec >= hangTime ) // it's been too long
- )
- {
- std::cerr << "Child of type "
- << (*i)->type
- << " hanging. "
- << "PID is "
- << (*i)->pid
- << endl;
- ++ how_many_hanging;
- }
- }
-
- return how_many_hanging;
- }
-
-
int verbosity;
};
-
children allMyChildren;
-
-
void
childExit ( int )
{
@@ -361,36 +311,45 @@ wait_for_newbie ( )
}
}
-
+bool endsWith(const char* str, const char* suffix) {
+ return (strlen(suffix) < strlen(str) && 0 == strcmp(str+strlen(str)-strlen(suffix), suffix));
+}
void
startNewBroker ( brokerVector & brokers,
- char const * srcRoot,
- char const * moduleDir,
+ char const * moduleOrDir,
string const clusterName,
- int verbosity )
+ int verbosity,
+ int durable )
{
static int brokerId = 0;
- stringstream path, prefix, module;
- module << moduleDir << "/cluster.so";
- path << srcRoot << "/qpidd";
+ stringstream path, prefix;
prefix << "soak-" << brokerId;
std::vector<std::string> argv = list_of<string>
("qpidd")
- ("--no-module-dir")
- ("--load-module=cluster.so")
- ("--cluster-name")
- (clusterName)
+ ("--cluster-name")(clusterName)
("--auth=no")
- ("--no-data-dir")
("--mgmt-enable=no")
- ("--log-prefix")
- (prefix.str())
- ("--log-to-file")
- (prefix.str()+".log");
+ ("--log-prefix")(prefix.str())
+ ("--log-to-file")(prefix.str()+".log")
+ ("--log-enable=error+")
+ ("TMP_DATA_DIR");
+
+ if (endsWith(moduleOrDir, "cluster.so")) {
+ // Module path specified, load only that module.
+ argv.push_back(string("--load-module=")+moduleOrDir);
+ argv.push_back("--no-module-dir");
+ if ( durable ) {
+ std::cerr << "failover_soak warning: durable arg hass no effect. Use \"dir\" option of \"moduleOrDir\".\n";
+ }
+ }
+ else {
+ // Module directory specified, load all modules in dir.
+ argv.push_back(string("--module-dir=")+moduleOrDir);
+ }
- newbie = new ForkedBroker ( argv );
+ newbie = new ForkedBroker (argv);
newbie_port = newbie->getPort();
ForkedBroker * broker = newbie;
@@ -473,7 +432,8 @@ pid_t
runDeclareQueuesClient ( brokerVector brokers,
char const * host,
char const * path,
- int verbosity
+ int verbosity,
+ int durable
)
{
string name("declareQueues");
@@ -492,6 +452,10 @@ runDeclareQueuesClient ( brokerVector brokers,
argv.push_back ( "declareQueues" );
argv.push_back ( host );
argv.push_back ( portSs.str().c_str() );
+ if ( durable )
+ argv.push_back ( "1" );
+ else
+ argv.push_back ( "0" );
argv.push_back ( 0 );
pid_t pid = fork();
@@ -562,7 +526,8 @@ startSendingClient ( brokerVector brokers,
char const * senderPath,
char const * nMessages,
char const * reportFrequency,
- int verbosity
+ int verbosity,
+ int durability
)
{
string name("sender");
@@ -586,6 +551,10 @@ startSendingClient ( brokerVector brokers,
argv.push_back ( nMessages );
argv.push_back ( reportFrequency );
argv.push_back ( verbosityStr );
+ if ( durability )
+ argv.push_back ( "1" );
+ else
+ argv.push_back ( "0" );
argv.push_back ( 0 );
pid_t pid = fork();
@@ -613,27 +582,33 @@ startSendingClient ( brokerVector brokers,
#define ERROR_KILLING_BROKER 8
+// If you want durability, use the "dir" option of "moduleOrDir" .
+
+
int
main ( int argc, char const ** argv )
{
- if ( argc < 9 ) {
- cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath receiverPath nMessages verbosity\n";
- cerr << " ( argc was " << argc << " )\n";
+ if ( argc != 9 ) {
+ cerr << "Usage: "
+ << argv[0]
+ << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable"
+ << endl;
+ cerr << "\tverbosity is an integer, durable is 0 or 1\n";
return BAD_ARGS;
}
-
signal ( SIGCHLD, childExit );
- char const * srcRoot = argv[1];
- char const * moduleDir = argv[2];
- char const * host = argv[3];
- char const * declareQueuesPath = argv[4];
- char const * senderPath = argv[5];
- char const * receiverPath = argv[6];
- char const * nMessages = argv[7];
- char const * reportFrequency = argv[8];
- int verbosity = atoi(argv[9]);
-
+ int i = 1;
+ char const * moduleOrDir = argv[i++];
+ char const * declareQueuesPath = argv[i++];
+ char const * senderPath = argv[i++];
+ char const * receiverPath = argv[i++];
+ char const * nMessages = argv[i++];
+ char const * reportFrequency = argv[i++];
+ int verbosity = atoi(argv[i++]);
+ int durable = atoi(argv[i++]);
+
+ char const * host = "127.0.0.1";
int maxBrokers = 50;
allMyChildren.verbosity = verbosity;
@@ -652,10 +627,10 @@ main ( int argc, char const ** argv )
int nBrokers = 3;
for ( int i = 0; i < nBrokers; ++ i ) {
startNewBroker ( brokers,
- srcRoot,
- moduleDir,
+ moduleOrDir,
clusterName,
- verbosity );
+ verbosity,
+ durable );
}
@@ -665,7 +640,7 @@ main ( int argc, char const ** argv )
// Run the declareQueues child.
int childStatus;
pid_t dqClientPid =
- runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity );
+ runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity, durable );
if ( -1 == dqClientPid ) {
cerr << "END_OF_TEST ERROR_START_DECLARE_1\n";
return CANT_FORK_DQ;
@@ -701,7 +676,8 @@ main ( int argc, char const ** argv )
senderPath,
nMessages,
reportFrequency,
- verbosity );
+ verbosity,
+ durable );
if ( -1 == sendingClientPid ) {
cerr << "END_OF_TEST ERROR_START_SENDER\n";
return CANT_FORK_SENDER;
@@ -745,10 +721,10 @@ main ( int argc, char const ** argv )
cout << "Starting new broker.\n\n";
startNewBroker ( brokers,
- srcRoot,
- moduleDir,
+ moduleOrDir,
clusterName,
- verbosity );
+ verbosity,
+ durable );
if ( verbosity > 1 )
printBrokers ( brokers );
@@ -787,16 +763,6 @@ main ( int argc, char const ** argv )
return ERROR_ON_CHILD;
}
- // If one is hanging, quit.
- if ( allMyChildren.hanging ( 120 ) )
- {
- /*
- * Don't kill any processes. Leave alive for questioning.
- * */
- std::cerr << "END_OF_TEST ERROR_HANGING\n";
- return HANGING;
- }
-
if ( verbosity > 1 ) {
std::cerr << "------- next kill-broker loop --------\n";
allMyChildren.print();
diff --git a/qpid/cpp/src/tests/header_test.vcproj b/qpid/cpp/src/tests/header_test.vcproj
index da761a6edb..cdfc984f46 100644
--- a/qpid/cpp/src/tests/header_test.vcproj
+++ b/qpid/cpp/src/tests/header_test.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="header_test"
- ProjectGUID="{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="header_test"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/interop_runner.cpp b/qpid/cpp/src/tests/interop_runner.cpp
deleted file mode 100644
index 8c6e0a6991..0000000000
--- a/qpid/cpp/src/tests/interop_runner.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/Options.h"
-#include "qpid/ptr_map.h"
-#include "qpid/Exception.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/ConnectionOptions.h"
-#include "qpid/client/Exchange.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/client/Queue.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Time.h"
-#include <iostream>
-#include <memory>
-#include "BasicP2PTest.h"
-#include "BasicPubSubTest.h"
-#include "TestCase.h"
-#include <boost/ptr_container/ptr_map.hpp>
-
-/**
- * Framework for interop tests.
- *
- * [see http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification for details].
- */
-
-using namespace qpid::client;
-using namespace qpid::sys;
-using qpid::TestCase;
-using qpid::framing::FieldTable;
-using qpid::framing::ReplyTo;
-using namespace std;
-
-class DummyRun : public TestCase
-{
-public:
- DummyRun() {}
- void assign(const string&, FieldTable&, ConnectionOptions&) {}
- void start() {}
- void stop() {}
- void report(qpid::client::Message&) {}
-};
-
-string parse_next_word(const string& input, const string& delims, string::size_type& position);
-
-/**
- */
-class Listener : public MessageListener, private Runnable{
- typedef boost::ptr_map<string, TestCase> TestMap;
-
- Channel& channel;
- ConnectionOptions& options;
- TestMap tests;
- const string name;
- const string topic;
- TestCase* test;
- auto_ptr<Thread> runner;
- ReplyTo reportTo;
- string reportCorrelator;
-
- void shutdown();
- bool invite(const string& name);
- void run();
-
- void sendResponse(Message& response, ReplyTo replyTo);
- void sendResponse(Message& response, Message& request);
- void sendSimpleResponse(const string& type, Message& request);
- void sendReport();
-public:
- Listener(Channel& channel, ConnectionOptions& options);
- void received(Message& msg);
- void bindAndConsume();
- void registerTest(string name, TestCase* test);
-};
-
-struct TestSettings : ConnectionOptions
-{
- bool help;
-
- TestSettings() : help(false)
- {
- addOptions()
- ("help", qpid::optValue(help), "print this usage statement");
- }
-};
-
-int main(int argc, char** argv) {
- try {
- TestSettings options;
- options.parse(argc, argv);
- if (options.help) {
- cout << options;
- } else {
- Connection connection;
- connection.open(options.host, options.port, "guest", "guest", options.virtualhost);
-
- Channel channel;
- connection.openChannel(channel);
-
- Listener listener(channel, options);
- listener.registerTest("TC1_DummyRun", new DummyRun());
- listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
- listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
-
- listener.bindAndConsume();
-
- channel.run();
- connection.close();
- }
- } catch(const exception& error) {
- cout << error.what() << endl << "Type " << argv[0] << " --help for help" << endl;
- }
-}
-
-Listener::Listener(Channel& _channel, ConnectionOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name)
-{}
-
-void Listener::registerTest(string name, TestCase* test)
-{
- tests.insert(name, test);
-}
-
-void Listener::bindAndConsume()
-{
- Queue control(name, true);
- channel.declareQueue(control);
- qpid::framing::FieldTable bindArgs;
- //replace these separate binds with a wildcard once that is supported on java broker
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "iop.control", bindArgs);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, topic, bindArgs);
-
- string tag;
- channel.consume(control, tag, this);
-}
-
-void Listener::sendSimpleResponse(const string& type, Message& request)
-{
- Message response;
- response.getHeaders().setString("CONTROL_TYPE", type);
- response.getHeaders().setString("CLIENT_NAME", name);
- response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic);
- response.getMessageProperties().setCorrelationId(request.getMessageProperties().getCorrelationId());
- sendResponse(response, request);
-}
-
-void Listener::sendResponse(Message& response, Message& request)
-{
- sendResponse(response, request.getMessageProperties().getReplyTo());
-}
-
-void Listener::sendResponse(Message& response, ReplyTo replyTo)
-{
- string exchange = replyTo.getExchange();
- string routingKey = replyTo.getRoutingKey();
- channel.publish(response, exchange, routingKey);
-}
-
-void Listener::received(Message& message)
-{
- string type(message.getHeaders().getString("CONTROL_TYPE"));
-
- if (type == "INVITE") {
- string name(message.getHeaders().getString("TEST_NAME"));
- if (name.empty() || invite(name)) {
- sendSimpleResponse("ENLIST", message);
- } else {
- cout << "Can't take part in '" << name << "'" << endl;
- }
- } else if (type == "ASSIGN_ROLE") {
- test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options);
- sendSimpleResponse("ACCEPT_ROLE", message);
- } else if (type == "START") {
- reportTo = message.getMessageProperties().getReplyTo();
- reportCorrelator = message.getMessageProperties().getCorrelationId();
- runner = auto_ptr<Thread>(new Thread(this));
- } else if (type == "STATUS_REQUEST") {
- reportTo = message.getMessageProperties().getReplyTo();
- reportCorrelator = message.getMessageProperties().getCorrelationId();
- test->stop();
- sendReport();
- } else if (type == "TERMINATE") {
- if (test) test->stop();
- shutdown();
- } else {
- cerr <<"ERROR!: Received unknown control message: " << type << endl;
- shutdown();
- }
-}
-
-void Listener::shutdown()
-{
- channel.close();
-}
-
-bool Listener::invite(const string& name)
-{
- TestMap::iterator i = tests.find(name);
- test = (i != tests.end()) ? qpid::ptr_map_ptr(i) : 0;
- return test;
-}
-
-void Listener::run()
-{
- //NB: this method will be called in its own thread
- //start test and when start returns...
- test->start();
- sendReport();
-}
-
-void Listener::sendReport()
-{
- Message report;
- report.getHeaders().setString("CONTROL_TYPE", "REPORT");
- test->report(report);
- report.getMessageProperties().setCorrelationId(reportCorrelator);
- sendResponse(report, reportTo);
-}
-
-string parse_next_word(const string& input, const string& delims, string::size_type& position)
-{
- string::size_type start = input.find_first_not_of(delims, position);
- if (start == string::npos) {
- return "";
- } else {
- string::size_type end = input.find_first_of(delims, start);
- if (end == string::npos) {
- end = input.length();
- }
- position = end;
- return input.substr(start, end - start);
- }
-}
diff --git a/qpid/cpp/src/tests/latencytest.vcproj b/qpid/cpp/src/tests/latencytest.vcproj
index 758302029a..da48e682bf 100644
--- a/qpid/cpp/src/tests/latencytest.vcproj
+++ b/qpid/cpp/src/tests/latencytest.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="latencytest"
- ProjectGUID="{4A809018-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{4A809018-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="latencytest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/perftest.vcproj b/qpid/cpp/src/tests/perftest.vcproj
index 3a2397ac9b..83cd550df5 100644
--- a/qpid/cpp/src/tests/perftest.vcproj
+++ b/qpid/cpp/src/tests/perftest.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="perftest"
- ProjectGUID="{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="perftest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/publish.vcproj b/qpid/cpp/src/tests/publish.vcproj
index 7422606a32..849b845025 100644
--- a/qpid/cpp/src/tests/publish.vcproj
+++ b/qpid/cpp/src/tests/publish.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="publish"
- ProjectGUID="{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="publish"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/receiver.vcproj b/qpid/cpp/src/tests/receiver.vcproj
index ce6fad366e..8744ac3234 100644
--- a/qpid/cpp/src/tests/receiver.vcproj
+++ b/qpid/cpp/src/tests/receiver.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="receiver"
- ProjectGUID="{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{7D314A98-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="receiver"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/replaying_sender.cpp b/qpid/cpp/src/tests/replaying_sender.cpp
index ea2a13bd54..3ee69eec14 100644
--- a/qpid/cpp/src/tests/replaying_sender.cpp
+++ b/qpid/cpp/src/tests/replaying_sender.cpp
@@ -40,9 +40,10 @@ class Sender : public FailoverManager::Command
public:
Sender(const std::string& queue, uint count, uint reportFreq);
void execute(AsyncSession& session, bool isRetry);
- uint getSent();
+ uint getSent();
- int verbosity;
+ void setVerbosity ( int v ) { verbosity = v; }
+ void setPersistence ( int p ) { persistence = p; }
private:
MessageReplayTracker sender;
@@ -51,9 +52,11 @@ class Sender : public FailoverManager::Command
const uint reportFrequency;
Message message;
+ int verbosity;
+ int persistence;
};
-Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq)
+Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq), verbosity(0), persistence(0)
{
message.getDeliveryProperties().setRoutingKey(queue);
}
@@ -69,6 +72,9 @@ void Sender::execute(AsyncSession& session, bool isRetry)
message_data << ++sent;
message.setData(message_data.str());
message.getHeaders().setInt("sn", sent);
+ if ( persistence )
+ message.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+
sender.send(message);
if (count > reportFrequency && !(sent % reportFrequency)) {
if ( verbosity > 0 )
@@ -91,9 +97,9 @@ int main(int argc, char ** argv)
{
ConnectionSettings settings;
- if ( argc != 6 )
+ if ( argc != 7 )
{
- std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity\n";
+ std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence\n";
return 1;
}
@@ -102,10 +108,12 @@ int main(int argc, char ** argv)
int n_messages = atoi(argv[3]);
int reportFrequency = atoi(argv[4]);
int verbosity = atoi(argv[5]);
+ int persistence = atoi(argv[6]);
FailoverManager connection(settings);
Sender sender("message_queue", n_messages, reportFrequency );
- sender.verbosity = verbosity;
+ sender.setVerbosity ( verbosity );
+ sender.setPersistence ( persistence );
try {
connection.execute ( sender );
if ( verbosity > 0 )
diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak
index 36dfed79a6..3c9a5589c4 100755
--- a/qpid/cpp/src/tests/run_failover_soak
+++ b/qpid/cpp/src/tests/run_failover_soak
@@ -45,12 +45,12 @@ fi
host=127.0.0.1
-src_root=..
-module_dir=$src_root/.libs
-
+MODULES=${MODULES:-../.libs}
MESSAGES=${MESSAGES:-300000}
REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`}
VERBOSITY=${VERBOSITY:-1}
+DURABILITY=${DURABILITY:-1}
-exec ./failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY
+rm -f soak-*.log
+exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY
diff --git a/qpid/cpp/src/tests/sender.vcproj b/qpid/cpp/src/tests/sender.vcproj
index 616b665406..fe564cb3c6 100644
--- a/qpid/cpp/src/tests/sender.vcproj
+++ b/qpid/cpp/src/tests/sender.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="sender"
- ProjectGUID="{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{09714CB8-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="sender"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/shlibtest.vcproj b/qpid/cpp/src/tests/shlibtest.vcproj
index 4a139a2cb5..2052230740 100644
--- a/qpid/cpp/src/tests/shlibtest.vcproj
+++ b/qpid/cpp/src/tests/shlibtest.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="shlibtest"
- ProjectGUID="{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="shlibtest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster
index 053b23da33..585ba082d5 100755
--- a/qpid/cpp/src/tests/start_cluster
+++ b/qpid/cpp/src/tests/start_cluster
@@ -28,15 +28,17 @@ with_ais_group() {
echo $* | newgrp ais
}
-rm -f cluster*.log
-SIZE=${1:-1}; shift
+rm -f cluster*.log cluster.ports qpidd.port
+
+SIZE=${1:-3}; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --auth=no $@"
for (( i=0; i<SIZE; ++i )); do
- PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1
+ DDIR=`mktemp -d /tmp/start_cluster.XXXXXXXXXX`
+ PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS --data-dir=$DDIR` || exit 1
echo $PORT >> cluster.ports
done
-head cluster.ports > qpidd.port # First member's port for tests.
+head -n 1 cluster.ports > qpidd.port # First member's port for tests.
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp
new file mode 100644
index 0000000000..e5c3522852
--- /dev/null
+++ b/qpid/cpp/src/tests/test_store.cpp
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**@file
+ * Plug-in message store for tests.
+ *
+ * Add functionality as required, build up a comprehensive set of
+ * features to support persistent behavior tests.
+ *
+ * Current features special "action" messages can:
+ * - raise exception from enqueue.
+ * - force host process to exit.
+ * - do async completion after a delay.
+ */
+
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include <boost/algorithm/string.hpp>
+#include <boost/cast.hpp>
+#include <boost/lexical_cast.hpp>
+
+using namespace qpid;
+using namespace broker;
+using namespace std;
+using namespace boost;
+using namespace qpid::sys;
+
+struct TestStoreOptions : public Options {
+
+ string name;
+
+ TestStoreOptions() : Options("Test Store Options") {
+ addOptions()
+ ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance.");
+ }
+};
+
+struct Completer : public Runnable {
+ intrusive_ptr<PersistableMessage> message;
+ int usecs;
+ Completer(intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {}
+ void run() {
+ qpid::sys::usleep(usecs);
+ message->enqueueComplete();
+ delete this;
+ }
+};
+
+class TestStore : public NullMessageStore {
+ public:
+ TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {}
+
+ ~TestStore() {
+ for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
+ }
+
+ void enqueue(TransactionContext* ,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& )
+ {
+ string data = polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+
+ // Check the message for special instructions.
+ size_t i = string::npos;
+ size_t j = string::npos;
+ if (starts_with(data, TEST_STORE_DO)
+ && (i = data.find(name+"[")) != string::npos
+ && (j = data.find("]", i)) != string::npos)
+ {
+ size_t start = i+name.size()+1;
+ string action = data.substr(start, j-start);
+
+ if (action == EXCEPTION) {
+ throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+ }
+ else if (action == EXIT_PROCESS) {
+ // FIXME aconway 2009-04-10: this is a dubious way to
+ // close the process at best, it can cause assertions or seg faults
+ // rather than clean exit.
+ QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data);
+ exit(0);
+ }
+ else if (starts_with(action, ASYNC)) {
+ std::string delayStr(action.substr(ASYNC.size()));
+ int delay = lexical_cast<int>(delayStr);
+ threads.push_back(Thread(*new Completer(msg, delay)));
+ }
+ else {
+ QPID_LOG(error, "TestStore " << name << " unknown action " << action);
+ msg->enqueueComplete();
+ }
+ }
+ else
+ msg->enqueueComplete();
+ }
+
+ private:
+ static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+ string name;
+ Broker& broker;
+ vector<Thread> threads;
+};
+
+const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
+const string TestStore::EXCEPTION = "exception";
+const string TestStore::EXIT_PROCESS = "exit_process";
+const string TestStore::ASYNC="async ";
+
+struct TestStorePlugin : public Plugin {
+
+ TestStoreOptions options;
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize (Plugin::Target& target)
+ {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
+ broker->setStore (new TestStore(options.name, *broker));
+ }
+
+ void initialize(qpid::Plugin::Target&) {}
+};
+
+static TestStorePlugin pluginInstance;
diff --git a/qpid/cpp/src/tests/tests.sln b/qpid/cpp/src/tests/tests.sln
index 273e90d1c8..9e24487820 100644
--- a/qpid/cpp/src/tests/tests.sln
+++ b/qpid/cpp/src/tests/tests.sln
@@ -8,37 +8,37 @@ Microsoft Visual Studio Solution File, Format Version 10.00
#
# MPC Command:
# C:\ace\MPC\mwc.pl -type vc9 -features boost=1 tests.mwc
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "client_test", "client_test.vcproj", "{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "client_test", "client_test.vcproj", "{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "consume", "consume.vcproj", "{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "consume", "consume.vcproj", "{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "echotest", "echotest.vcproj", "{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "echotest", "echotest.vcproj", "{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "header_test", "header_test.vcproj", "{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "header_test", "header_test.vcproj", "{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "latencytest", "latencytest.vcproj", "{4A809018-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "latencytest", "latencytest.vcproj", "{4A809018-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "perftest", "perftest.vcproj", "{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "perftest", "perftest.vcproj", "{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "publish", "publish.vcproj", "{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "publish", "publish.vcproj", "{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "receiver", "receiver.vcproj", "{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "receiver", "receiver.vcproj", "{7D314A98-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "sender", "sender.vcproj", "{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "sender", "sender.vcproj", "{09714CB8-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "shlibtest", "shlibtest.vcproj", "{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "shlibtest", "shlibtest.vcproj", "{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_listener", "topic_listener.vcproj", "{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_listener", "topic_listener.vcproj", "{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_publisher", "topic_publisher.vcproj", "{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_publisher", "topic_publisher.vcproj", "{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txjob", "txjob.vcproj", "{09034A53-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txjob", "txjob.vcproj", "{09034A53-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txshift", "txshift.vcproj", "{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txshift", "txshift.vcproj", "{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txtest", "txtest.vcproj", "{697786BE-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txtest", "txtest.vcproj", "{697786BE-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "unit_test", "unit_test.vcproj", "{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "unit_test", "unit_test.vcproj", "{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -48,134 +48,134 @@ Global
Release|x64 = Release|x64
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64
- {51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64
+ {51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/qpid/cpp/src/tests/topic_listener.vcproj b/qpid/cpp/src/tests/topic_listener.vcproj
index 0be31b8348..3f338bfb62 100644
--- a/qpid/cpp/src/tests/topic_listener.vcproj
+++ b/qpid/cpp/src/tests/topic_listener.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="topic_listener"
- ProjectGUID="{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="topic_listener"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/topic_publisher.vcproj b/qpid/cpp/src/tests/topic_publisher.vcproj
index 016c6d4a38..5b0fd21ef4 100644
--- a/qpid/cpp/src/tests/topic_publisher.vcproj
+++ b/qpid/cpp/src/tests/topic_publisher.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="topic_publisher"
- ProjectGUID="{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="topic_publisher"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/txjob.vcproj b/qpid/cpp/src/tests/txjob.vcproj
index 19fe3fba12..e002d58ae9 100644
--- a/qpid/cpp/src/tests/txjob.vcproj
+++ b/qpid/cpp/src/tests/txjob.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="txjob"
- ProjectGUID="{09034A53-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{09034A53-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="txjob"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/txshift.vcproj b/qpid/cpp/src/tests/txshift.vcproj
index 3212881351..75ccfc6d4f 100644
--- a/qpid/cpp/src/tests/txshift.vcproj
+++ b/qpid/cpp/src/tests/txshift.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="txshift"
- ProjectGUID="{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="txshift"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/txtest.vcproj b/qpid/cpp/src/tests/txtest.vcproj
index 663291a9d5..3549ba97aa 100644
--- a/qpid/cpp/src/tests/txtest.vcproj
+++ b/qpid/cpp/src/tests/txtest.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="txtest"
- ProjectGUID="{697786BE-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{697786BE-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="txtest"
Keyword="Win32Proj"
SignManifests="true"
diff --git a/qpid/cpp/src/tests/unit_test.vcproj b/qpid/cpp/src/tests/unit_test.vcproj
index 8710b617f8..9653290013 100644
--- a/qpid/cpp/src/tests/unit_test.vcproj
+++ b/qpid/cpp/src/tests/unit_test.vcproj
@@ -3,7 +3,7 @@
ProjectType="Visual C++"
Version="9.00"
Name="unit_test"
- ProjectGUID="{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}"
+ ProjectGUID="{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}"
RootNamespace="unit_test"
Keyword="Win32Proj"
SignManifests="true"
@@ -76,7 +76,7 @@
/>
<Tool
Name="VCLinkerTool"
- AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib"
+ AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib ws2_32.lib"
OutputFile="$(OutDir)\unit_test.exe"
LinkIncremental="2"
SuppressStartupBanner="true"
@@ -155,7 +155,7 @@
/>
<Tool
Name="VCLinkerTool"
- AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib"
+ AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib ws2_32.lib"
OutputFile="$(OutDir)\unit_test.exe"
LinkIncremental="1"
SuppressStartupBanner="true"
@@ -240,7 +240,7 @@
<Tool
Name="VCLinkerTool"
AdditionalOptions="/machine:AMD64"
- AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib"
+ AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib ws2_32.lib"
OutputFile="$(OutDir)\unit_test.exe"
LinkIncremental="2"
SuppressStartupBanner="true"
@@ -320,7 +320,7 @@
<Tool
Name="VCLinkerTool"
AdditionalOptions="/machine:AMD64"
- AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib"
+ AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib ws2_32.lib"
OutputFile="$(OutDir)\unit_test.exe"
LinkIncremental="1"
SuppressStartupBanner="true"
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 8fdde0ada6..df90fc6f82 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -48,6 +48,19 @@
<field name="id" type="uint64"/>
</control>
+ <domain name="error-type" type="uint8" label="Types of error">
+ <enum>
+ <choice name="none" value="0"/>
+ <choice name="session" value="1"/>
+ <choice name="connection" value="2"/>
+ </enum>
+ </domain>
+
+ <control name="error-check" code="0x13">
+ <field name="type" type="error-type"/>
+ <field name="frame-seq" type="uint64"/>
+ </control>
+
<control name="shutdown" code="0x20" label="Shut down entire cluster"/>
</class>
@@ -132,6 +145,7 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
+ <field name="frame-seq" type="uint64"/> <!-- frame sequence number -->
</control>
<!-- Set the position of a replicated queue. -->
@@ -146,5 +160,6 @@
<!-- Set expiry-id for subsequent messages. -->
<control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
+
</class>
</amqp>