summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h17
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/client/Completion.cpp38
-rw-r--r--qpid/cpp/src/qpid/client/Completion.h35
-rw-r--r--qpid/cpp/src/qpid/client/CompletionImpl.h50
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp6
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.h2
-rw-r--r--qpid/cpp/src/qpid/client/FailoverListener.cpp3
-rw-r--r--qpid/cpp/src/qpid/client/FailoverListener.h1
-rw-r--r--qpid/cpp/src/qpid/client/Future.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/Future.h14
-rw-r--r--qpid/cpp/src/qpid/client/Handle.h11
-rw-r--r--qpid/cpp/src/qpid/client/HandlePrivate.h12
-rw-r--r--qpid/cpp/src/qpid/client/LocalQueue.cpp5
-rw-r--r--qpid/cpp/src/qpid/client/Message.cpp59
-rw-r--r--qpid/cpp/src/qpid/client/Message.h55
-rw-r--r--qpid/cpp/src/qpid/client/MessageImpl.cpp71
-rw-r--r--qpid/cpp/src/qpid/client/MessageImpl.h76
-rw-r--r--qpid/cpp/src/qpid/client/PrivateImpl.h54
-rw-r--r--qpid/cpp/src/qpid/client/PrivateImplPrivate.h66
-rw-r--r--qpid/cpp/src/qpid/client/SessionBase_0_10.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/SessionBase_0_10.h10
-rw-r--r--qpid/cpp/src/qpid/client/Subscription.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/TypedResult.h10
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp79
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h17
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp43
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.h20
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp53
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h19
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.cpp120
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.h80
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/EventFrame.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/EventFrame.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp7
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/LockedConnectionMap.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp17
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h1
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.h3
-rw-r--r--qpid/cpp/src/qpid/sys/LatencyMetric.cpp74
-rw-r--r--qpid/cpp/src/qpid/sys/LatencyMetric.h85
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h3
62 files changed, 928 insertions, 402 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index db957051d8..f927db09bb 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -88,6 +88,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
catch(const SessionException& e) {
QPID_LOG(error, "Execution exception: " << e.what());
+ executionException(e.code, e.what()); // Let subclass handle this first.
framing::AMQP_AllProxy::Execution execution(channel);
AMQMethodBody* m = f.getMethod();
SequenceNumber commandId;
@@ -98,6 +99,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
catch(const ChannelException& e){
QPID_LOG(error, "Channel exception: " << e.what());
+ channelException(e.code, e.what()); // Let subclass handle this first.
peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
index 0b158ec2b4..0d9c72ff02 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -87,8 +87,9 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
QPID_COMMON_EXTERN virtual void invoke(const framing::AMQMethodBody& m);
virtual void setState(const std::string& sessionName, bool force) = 0;
- virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0;
virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0;
+ virtual void channelException(framing::session::DetachCode, const std::string& msg) = 0;
+ virtual void executionException(framing::execution::ErrorCode, const std::string& msg) = 0;
virtual void detaching() = 0;
// Notification of events
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index b06e06d353..365b3ccbeb 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -57,7 +57,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtObject(0),
links(broker_.getLinks()),
agent(0),
- timer(broker_.getTimer())
+ timer(broker_.getTimer()),
+ errorListener(0)
{
Manageable* parent = broker.GetVhostObject();
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index b659fe6468..e67cdce681 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -66,6 +66,17 @@ class Connection : public sys::ConnectionInputHandler,
public RefCounted
{
public:
+ /**
+ * Listener that can be registered with a Connection to be informed of errors.
+ */
+ class ErrorListener
+ {
+ public:
+ virtual ~ErrorListener() {}
+ virtual void sessionError(uint16_t channel, const std::string&) = 0;
+ virtual void connectionError(const std::string&) = 0;
+ };
+
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
~Connection ();
@@ -101,6 +112,9 @@ class Connection : public sys::ConnectionInputHandler,
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
void setFederationLink(bool b);
+ /** Connection does not delete the listener. 0 resets. */
+ void setErrorListener(ErrorListener* l) { errorListener=l; }
+ ErrorListener* getErrorListener() { return errorListener; }
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
@@ -112,6 +126,7 @@ class Connection : public sys::ConnectionInputHandler,
void sendClose();
void setSecureConnection(SecureConnection* secured);
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -128,6 +143,8 @@ class Connection : public sys::ConnectionInputHandler,
management::ManagementAgent* agent;
Timer& timer;
boost::intrusive_ptr<TimerTask> heartbeatTimer;
+ ErrorListener* errorListener;
+
public:
qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
};
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 63212c7794..8b70836da0 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -64,13 +64,16 @@ void ConnectionHandler::heartbeat()
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
AMQMethodBody* method=frame.getBody()->getMethod();
+ Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
try{
if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
handler->connection.getChannel(frame.getChannel()).in(frame);
}
}catch(ConnectionException& e){
+ if (errorListener) errorListener->connectionError(e.what());
handler->proxy.close(e.code, e.what());
}catch(std::exception& e){
+ if (errorListener) errorListener->connectionError(e.what());
handler->proxy.close(541/*internal error*/, e.what());
}
}
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
index 907f1e56e1..ffe0cc437b 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
@@ -33,4 +33,6 @@ bool ExpiryPolicy::hasExpired(Message& m) {
return m.getExpiration() < sys::AbsTime::now();
}
+void ExpiryPolicy::forget(Message&) {}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
index cefe9b7552..eeb3ffda21 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
@@ -39,6 +39,7 @@ class ExpiryPolicy : public RefCounted
QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
QPID_BROKER_EXTERN virtual void willExpire(Message&);
QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
+ QPID_BROKER_EXTERN virtual void forget(Message&);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 40b5515829..1e9eb9d386 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -53,6 +53,8 @@ Message::Message(const framing::SequenceNumber& id) :
Message::~Message()
{
+ if (expiryPolicy)
+ expiryPolicy->forget(*this);
}
void Message::forcePersistent()
@@ -334,7 +336,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
- if (expiryPolicy)
+ if (expiryPolicy)
expiryPolicy->willExpire(*this);
}
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 442c3eb34b..ca1f875991 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -45,14 +45,20 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
-void SessionHandler::channelException(framing::session::DetachCode, const std::string&) {
- handleDetach();
-}
-
void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ // NOTE: must tell the error listener _before_ calling connection.close()
+ if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
connection.close(code, msg);
}
+void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
+ if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
+void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
+ if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index ffc032f64c..ca6d6bb193 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -73,8 +73,9 @@ class SessionHandler : public amqp_0_10::SessionHandler {
virtual void setState(const std::string& sessionName, bool force);
virtual qpid::SessionState* getState();
virtual framing::FrameHandler* getInHandler();
- virtual void channelException(framing::session::DetachCode code, const std::string& msg);
virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
+ virtual void channelException(framing::session::DetachCode, const std::string& msg);
+ virtual void executionException(framing::execution::ErrorCode, const std::string& msg);
virtual void detaching();
virtual void readyToSend();
diff --git a/qpid/cpp/src/qpid/client/Completion.cpp b/qpid/cpp/src/qpid/client/Completion.cpp
new file mode 100644
index 0000000000..e3676b2bde
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Completion.cpp
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Completion.h"
+#include "CompletionImpl.h"
+#include "HandlePrivate.h"
+
+namespace qpid {
+namespace client {
+
+Completion::Completion(CompletionImpl* i) : Handle<CompletionImpl>(i) {}
+Completion::~Completion() {}
+Completion::Completion(const Completion& c) : Handle<CompletionImpl>(c.impl) {}
+Completion& Completion::operator=(const Completion& c) { Handle<CompletionImpl>::operator=(c); return *this; }
+
+void Completion::wait() { impl->wait(); }
+bool Completion::isComplete() { return impl->isComplete(); }
+std::string Completion::getResult() { return impl->getResult(); }
+
+}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Completion.h b/qpid/cpp/src/qpid/client/Completion.h
index c4979d7934..0b246b7765 100644
--- a/qpid/cpp/src/qpid/client/Completion.h
+++ b/qpid/cpp/src/qpid/client/Completion.h
@@ -22,13 +22,14 @@
#ifndef _Completion_
#define _Completion_
-#include <boost/shared_ptr.hpp>
-#include "Future.h"
-#include "SessionImpl.h"
+#include "Handle.h"
+#include <string>
namespace qpid {
namespace client {
+class CompletionImpl;
+
/**
* Asynchronous commands that do not return a result will return a
* Completion. You can use the completion to wait for that specific
@@ -38,32 +39,26 @@ namespace client {
*
*\ingroup clientapi
*/
-class Completion
+class Completion : public Handle<CompletionImpl>
{
-protected:
- Future future;
- shared_ptr<SessionImpl> session;
-
public:
///@internal
- Completion() {}
-
- ///@internal
- Completion(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {}
+ QPID_CLIENT_EXTERN Completion(CompletionImpl* =0);
+ QPID_CLIENT_EXTERN ~Completion();
+ QPID_CLIENT_EXTERN Completion(const Completion&);
+ QPID_CLIENT_EXTERN Completion& operator=(const Completion&);
/** Wait for the asynchronous command that returned this
*Completion to complete.
*
- *@exception If the command returns an error, get() throws an exception.
+ *@exception If the command returns an error.
*/
- void wait()
- {
- future.wait(*session);
- }
+ QPID_CLIENT_EXTERN void wait();
+
+ QPID_CLIENT_EXTERN bool isComplete();
- bool isComplete() {
- return future.isComplete(*session);
- }
+ protected:
+ QPID_CLIENT_EXTERN std::string getResult();
};
}}
diff --git a/qpid/cpp/src/qpid/client/CompletionImpl.h b/qpid/cpp/src/qpid/client/CompletionImpl.h
new file mode 100644
index 0000000000..119abc093a
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/CompletionImpl.h
@@ -0,0 +1,50 @@
+#ifndef QPID_CLIENT_COMPLETIONIMPL_H
+#define QPID_CLIENT_COMPLETIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "Future.h"
+
+namespace qpid {
+namespace client {
+
+///@internal
+class CompletionImpl : public RefCounted
+{
+public:
+ CompletionImpl() {}
+ CompletionImpl(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {}
+
+ bool isComplete() { return future.isComplete(*session); }
+ void wait() { future.wait(*session); }
+ std::string getResult() { return future.getResult(*session); }
+
+protected:
+ Future future;
+ shared_ptr<SessionImpl> session;
+};
+
+}} // namespace qpid::client
+
+
+#endif /*!QPID_CLIENT_COMPLETIONIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index cc62d724cb..e8415403ca 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "Connection.h"
+#include "ConnectionImpl.h"
#include "ConnectionSettings.h"
#include "Message.h"
#include "SessionImpl.h"
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index 846ac33790..d898ea70d9 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -25,6 +25,7 @@
#include <string>
#include "qpid/client/Session.h"
#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/ConnectionSettings.h"
namespace qpid {
@@ -32,7 +33,6 @@ struct Url;
namespace client {
-struct ConnectionSettings;
class ConnectionImpl;
/**
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index 745bdb63b5..b1e83025ab 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -111,9 +111,11 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
Mutex::ScopedLock l(lock);
s = sessions[frame.getChannel()].lock();
}
- if (!s)
- throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel()));
- s->in(frame);
+ if (!s) {
+ QPID_LOG(info, "Dropping frame received on invalid channel: " << frame);
+ } else {
+ s->in(frame);
+ }
}
bool ConnectionImpl::isOpen() const
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index 8d8574520a..5156031748 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -26,6 +26,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/BlockingQueue.h"
#include "Message.h"
+#include "MessageImpl.h"
#include <boost/state_saver.hpp>
@@ -49,6 +50,9 @@ Dispatcher::Dispatcher(const Session& s, const std::string& q)
session.getExecution().getDemux().get(q);
}
+Dispatcher::~Dispatcher() {}
+
+
void Dispatcher::start()
{
worker = Thread(this);
@@ -71,7 +75,7 @@ void Dispatcher::run()
Mutex::ScopedUnlock u(lock);
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message msg(*content);
+ Message msg(new MessageImpl(*content));
boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination());
if (!listener) {
QPID_LOG(error, "No listener found for destination " << msg.getDestination());
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.h b/qpid/cpp/src/qpid/client/Dispatcher.h
index e84f8f303d..4dbb75dcf2 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.h
+++ b/qpid/cpp/src/qpid/client/Dispatcher.h
@@ -30,7 +30,6 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "MessageListener.h"
-#include "SubscriptionImpl.h"
namespace qpid {
namespace client {
@@ -61,6 +60,7 @@ class Dispatcher : public sys::Runnable
public:
Dispatcher(const Session& session, const std::string& queue = "");
+ ~Dispatcher();
void start();
void wait();
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp
index 16370f8912..ed9354d528 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp
@@ -20,6 +20,9 @@
*/
#include "FailoverListener.h"
#include "SessionBase_0_10Access.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/SubscriptionImpl.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.h b/qpid/cpp/src/qpid/client/FailoverListener.h
index fe73a26611..7afee736ac 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.h
+++ b/qpid/cpp/src/qpid/client/FailoverListener.h
@@ -33,6 +33,7 @@ namespace qpid {
namespace client {
class SubscriptionManager;
+class ConnectionImpl;
/**
* @internal Listen for failover updates from the amq.failover exchange.
diff --git a/qpid/cpp/src/qpid/client/Future.cpp b/qpid/cpp/src/qpid/client/Future.cpp
index 6a0c78ae4b..fda40219ba 100644
--- a/qpid/cpp/src/qpid/client/Future.cpp
+++ b/qpid/cpp/src/qpid/client/Future.cpp
@@ -20,6 +20,7 @@
*/
#include "Future.h"
+#include "SessionImpl.h"
namespace qpid {
namespace client {
diff --git a/qpid/cpp/src/qpid/client/Future.h b/qpid/cpp/src/qpid/client/Future.h
index ea01522fe8..28c9a2bbbd 100644
--- a/qpid/cpp/src/qpid/client/Future.h
+++ b/qpid/cpp/src/qpid/client/Future.h
@@ -26,17 +26,15 @@
#include <boost/shared_ptr.hpp>
#include "qpid/Exception.h"
#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/StructHelper.h"
#include "FutureCompletion.h"
#include "FutureResult.h"
-#include "SessionImpl.h"
#include "ClientImportExport.h"
namespace qpid {
namespace client {
/**@internal */
-class Future : private framing::StructHelper
+class Future
{
framing::SequenceNumber command;
boost::shared_ptr<FutureResult> result;
@@ -46,13 +44,9 @@ public:
Future() : complete(false) {}
Future(const framing::SequenceNumber& id) : command(id), complete(false) {}
- template <class T> void decodeResult(T& value, SessionImpl& session)
- {
- if (result) {
- decode(value, result->getResult(session));
- } else {
- throw Exception("Result not expected");
- }
+ std::string getResult(SessionImpl& session) {
+ if (result) return result->getResult(session);
+ else throw Exception("Result not expected");
}
QPID_CLIENT_EXTERN void wait(SessionImpl& session);
diff --git a/qpid/cpp/src/qpid/client/Handle.h b/qpid/cpp/src/qpid/client/Handle.h
index d8b822d0f9..12fb4cf3c1 100644
--- a/qpid/cpp/src/qpid/client/Handle.h
+++ b/qpid/cpp/src/qpid/client/Handle.h
@@ -30,9 +30,11 @@ namespace client {
template <class T> class HandlePrivate;
/**
- * A handle is like a pointer: it points to some underlying object.
+ * A handle is like a pointer: it points to some implementation object.
+ * Copying the handle does not copy the object.
+ *
* Handles can be null, like a 0 pointer. Use isValid(), isNull() or the
- * implicit conversion to bool to test for a null handle.
+ * conversion to bool to test for a null handle.
*/
template <class T> class Handle {
public:
@@ -46,8 +48,11 @@ template <class T> class Handle {
/**@return true if handle is null. It is an error to call any function on a null handle. */
QPID_CLIENT_EXTERN bool isNull() const { return !impl; }
+ /** Conversion to bool supports idiom if (handle) { handle->... } */
QPID_CLIENT_EXTERN operator bool() const { return impl; }
- QPID_CLIENT_EXTERN bool operator !() const { return impl; }
+
+ /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */
+ QPID_CLIENT_EXTERN bool operator !() const { return !impl; }
QPID_CLIENT_EXTERN void swap(Handle<T>&);
diff --git a/qpid/cpp/src/qpid/client/HandlePrivate.h b/qpid/cpp/src/qpid/client/HandlePrivate.h
index 488ce48075..46e4bff808 100644
--- a/qpid/cpp/src/qpid/client/HandlePrivate.h
+++ b/qpid/cpp/src/qpid/client/HandlePrivate.h
@@ -21,14 +21,16 @@
* under the License.
*
*/
+#include "Handle.h"
+#include "qpid/RefCounted.h"
#include <algorithm>
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace client {
/** @file
- * Private implementation of handle, include in .cpp file of handle
- * subclasses _after_ including the declaration of class T.
+ * Implementation of handle, include in .cpp file of handle subclasses.
* T can be any class that can be used with boost::intrusive_ptr.
*/
@@ -52,9 +54,13 @@ void Handle<T>::swap(Handle<T>& h) { std::swap(impl, h.impl); }
template <class T>
class HandlePrivate {
public:
- static boost::intrusive_ptr<T> get(Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+ static boost::intrusive_ptr<T> get(const Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+ static void set(Handle<T>& h, const boost::intrusive_ptr<T>& p) { Handle<T>(p.get()).swap(h); }
};
+template<class T> boost::intrusive_ptr<T> handleGetPtr(Handle<T>& h) { return HandlePrivate<T>::get(h); }
+template<class T> boost::intrusive_ptr<const T> handleGetPtr(const Handle<T>& h) { return HandlePrivate<T>::get(h); }
+template<class T> void handleSetPtr(Handle<T>& h, const boost::intrusive_ptr<T>& p) { HandlePrivate<T>::set(h, p); }
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/LocalQueue.cpp b/qpid/cpp/src/qpid/client/LocalQueue.cpp
index e449c9f795..02fecf804f 100644
--- a/qpid/cpp/src/qpid/client/LocalQueue.cpp
+++ b/qpid/cpp/src/qpid/client/LocalQueue.cpp
@@ -19,11 +19,14 @@
*
*/
#include "LocalQueue.h"
+#include "MessageImpl.h"
#include "qpid/Exception.h"
#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "HandlePrivate.h"
#include "SubscriptionImpl.h"
+#include "CompletionImpl.h"
namespace qpid {
namespace client {
@@ -49,7 +52,7 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) {
bool ok = queue->pop(content, timeout);
if (!ok) return false;
if (content->isA<MessageTransferBody>()) {
- result = Message(*content);
+ result = Message(new MessageImpl(*content));
boost::intrusive_ptr<SubscriptionImpl> si = HandlePrivate<SubscriptionImpl>::get(subscription);
assert(si);
if (si) si->received(result);
diff --git a/qpid/cpp/src/qpid/client/Message.cpp b/qpid/cpp/src/qpid/client/Message.cpp
index 13caaecefd..962ce26305 100644
--- a/qpid/cpp/src/qpid/client/Message.cpp
+++ b/qpid/cpp/src/qpid/client/Message.cpp
@@ -20,52 +20,37 @@
*/
#include "Message.h"
+#include "PrivateImplPrivate.h"
+#include "MessageImpl.h"
namespace qpid {
namespace client {
-Message::Message(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {}
+template class PrivateImpl<MessageImpl>;
-std::string Message::getDestination() const
-{
- return method.getDestination();
-}
+Message::Message(const std::string& data, const std::string& routingKey) : PrivateImpl<MessageImpl>(new MessageImpl(data, routingKey)) {}
+Message::Message(MessageImpl* i) : PrivateImpl<MessageImpl>(i) {}
+Message::~Message() {}
-bool Message::isRedelivered() const
-{
- return hasDeliveryProperties() && getDeliveryProperties().getRedelivered();
-}
+std::string Message::getDestination() const { return impl->getDestination(); }
+bool Message::isRedelivered() const { return impl->isRedelivered(); }
+void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); }
+framing::FieldTable& Message::getHeaders() { return impl->getHeaders(); }
+const framing::FieldTable& Message::getHeaders() const { return impl->getHeaders(); }
+const framing::SequenceNumber& Message::getId() const { return impl->getId(); }
-void Message::setRedelivered(bool redelivered)
-{
- getDeliveryProperties().setRedelivered(redelivered);
-}
+void Message::setData(const std::string& s) { impl->setData(s); }
+const std::string& Message::getData() const { return impl->getData(); }
+std::string& Message::getData() { return impl->getData(); }
-framing::FieldTable& Message::getHeaders()
-{
- return getMessageProperties().getApplicationHeaders();
-}
+void Message::appendData(const std::string& s) { impl->appendData(s); }
-const framing::FieldTable& Message::getHeaders() const
-{
- return getMessageProperties().getApplicationHeaders();
-}
+bool Message::hasMessageProperties() const { return impl->hasMessageProperties(); }
+framing::MessageProperties& Message::getMessageProperties() { return impl->getMessageProperties(); }
+const framing::MessageProperties& Message::getMessageProperties() const { return impl->getMessageProperties(); }
-const framing::MessageTransferBody& Message::getMethod() const
-{
- return method;
-}
-
-const framing::SequenceNumber& Message::getId() const
-{
- return id;
-}
-
-/**@internal for incoming messages */
-Message::Message(const framing::FrameSet& frameset) :
- method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
-{
- populate(frameset);
-}
+bool Message::hasDeliveryProperties() const { return impl->hasDeliveryProperties(); }
+framing::DeliveryProperties& Message::getDeliveryProperties() { return impl->getDeliveryProperties(); }
+const framing::DeliveryProperties& Message::getDeliveryProperties() const { return impl->getDeliveryProperties(); }
}}
diff --git a/qpid/cpp/src/qpid/client/Message.h b/qpid/cpp/src/qpid/client/Message.h
index 235e20f97d..97238db647 100644
--- a/qpid/cpp/src/qpid/client/Message.h
+++ b/qpid/cpp/src/qpid/client/Message.h
@@ -1,5 +1,5 @@
-#ifndef _client_Message_h
-#define _client_Message_h
+#ifndef QPID_CLIENT_MESSAGE_H
+#define QPID_CLIENT_MESSAGE_H
/*
*
@@ -21,15 +21,24 @@
* under the License.
*
*/
-#include <string>
-#include "qpid/client/Session.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/TransferContent.h"
+
+#include "qpid/client/PrivateImpl.h"
#include "qpid/client/ClientImportExport.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include <string>
namespace qpid {
+
+namespace framing {
+class FieldTable;
+class SequenceNumber; // FIXME aconway 2009-04-17: remove with getID?
+}
+
namespace client {
+class MessageImpl;
+
/**
* A message sent to or received from the broker.
*
@@ -104,8 +113,7 @@ namespace client {
*
*
*/
-
-class Message : public framing::TransferContent
+class Message : public PrivateImpl<MessageImpl>
{
public:
/** Create a Message.
@@ -115,6 +123,23 @@ public:
QPID_CLIENT_EXTERN Message(const std::string& data=std::string(),
const std::string& routingKey=std::string());
+ QPID_CLIENT_EXTERN ~Message();
+
+ QPID_CLIENT_EXTERN void setData(const std::string&);
+ QPID_CLIENT_EXTERN const std::string& getData() const;
+ QPID_CLIENT_EXTERN std::string& getData();
+
+ QPID_CLIENT_EXTERN void appendData(const std::string&);
+
+ QPID_CLIENT_EXTERN bool hasMessageProperties() const;
+ QPID_CLIENT_EXTERN framing::MessageProperties& getMessageProperties();
+ QPID_CLIENT_EXTERN const framing::MessageProperties& getMessageProperties() const;
+
+ QPID_CLIENT_EXTERN bool hasDeliveryProperties() const;
+ QPID_CLIENT_EXTERN framing::DeliveryProperties& getDeliveryProperties();
+ QPID_CLIENT_EXTERN const framing::DeliveryProperties& getDeliveryProperties() const;
+
+
/** The destination of messages sent to the broker is the exchange
* name. The destination of messages received from the broker is
* the delivery tag identifyig the local subscription (often this
@@ -133,20 +158,14 @@ public:
/** Get a non-modifyable reference to the message headers. */
QPID_CLIENT_EXTERN const framing::FieldTable& getHeaders() const;
- ///@internal
- QPID_CLIENT_EXTERN const framing::MessageTransferBody& getMethod() const;
+ // FIXME aconway 2009-04-17: does this need to be in public API?
///@internal
QPID_CLIENT_EXTERN const framing::SequenceNumber& getId() const;
- /**@internal for incoming messages */
- QPID_CLIENT_EXTERN Message(const framing::FrameSet& frameset);
-
-private:
- //method and id are only set for received messages:
- framing::MessageTransferBody method;
- framing::SequenceNumber id;
+ ///@internal
+ Message(MessageImpl*);
};
}}
-#endif /*!_client_Message_h*/
+#endif /*!QPID_CLIENT_MESSAGE_H*/
diff --git a/qpid/cpp/src/qpid/client/MessageImpl.cpp b/qpid/cpp/src/qpid/client/MessageImpl.cpp
new file mode 100644
index 0000000000..3d06fd1d8d
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/MessageImpl.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageImpl.h"
+
+namespace qpid {
+namespace client {
+
+MessageImpl::MessageImpl(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {}
+
+std::string MessageImpl::getDestination() const
+{
+ return method.getDestination();
+}
+
+bool MessageImpl::isRedelivered() const
+{
+ return hasDeliveryProperties() && getDeliveryProperties().getRedelivered();
+}
+
+void MessageImpl::setRedelivered(bool redelivered)
+{
+ getDeliveryProperties().setRedelivered(redelivered);
+}
+
+framing::FieldTable& MessageImpl::getHeaders()
+{
+ return getMessageProperties().getApplicationHeaders();
+}
+
+const framing::FieldTable& MessageImpl::getHeaders() const
+{
+ return getMessageProperties().getApplicationHeaders();
+}
+
+const framing::MessageTransferBody& MessageImpl::getMethod() const
+{
+ return method;
+}
+
+const framing::SequenceNumber& MessageImpl::getId() const
+{
+ return id;
+}
+
+/**@internal for incoming messages */
+MessageImpl::MessageImpl(const framing::FrameSet& frameset) :
+ method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
+{
+ populate(frameset);
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/client/MessageImpl.h b/qpid/cpp/src/qpid/client/MessageImpl.h
new file mode 100644
index 0000000000..c06d9b5afc
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/MessageImpl.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLIENT_MESSAGEIMPL_H
+#define QPID_CLIENT_MESSAGEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/Session.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/TransferContent.h"
+
+namespace qpid {
+namespace client {
+
+class MessageImpl : public framing::TransferContent
+{
+public:
+ /** Create a Message.
+ *@param data Data for the message body.
+ *@param routingKey Passed to the exchange that routes the message.
+ */
+ MessageImpl(const std::string& data=std::string(),
+ const std::string& routingKey=std::string());
+
+ /** The destination of messages sent to the broker is the exchange
+ * name. The destination of messages received from the broker is
+ * the delivery tag identifyig the local subscription (often this
+ * is the name of the subscribed queue.)
+ */
+ std::string getDestination() const;
+
+ /** Check the redelivered flag. */
+ bool isRedelivered() const;
+ /** Set the redelivered flag. */
+ void setRedelivered(bool redelivered);
+
+ /** Get a modifyable reference to the message headers. */
+ framing::FieldTable& getHeaders();
+
+ /** Get a non-modifyable reference to the message headers. */
+ const framing::FieldTable& getHeaders() const;
+
+ ///@internal
+ const framing::MessageTransferBody& getMethod() const;
+ ///@internal
+ const framing::SequenceNumber& getId() const;
+
+ /**@internal for incoming messages */
+ MessageImpl(const framing::FrameSet& frameset);
+
+private:
+ //method and id are only set for received messages:
+ framing::MessageTransferBody method;
+ framing::SequenceNumber id;
+};
+
+}}
+
+#endif /*!QPID_CLIENT_MESSAGEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/PrivateImpl.h b/qpid/cpp/src/qpid/client/PrivateImpl.h
new file mode 100644
index 0000000000..6e5ea35ce0
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/PrivateImpl.h
@@ -0,0 +1,54 @@
+#ifndef QPID_CLIENT_PRIVATEIMPL_H
+#define QPID_CLIENT_PRIVATEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+
+namespace qpid {
+namespace client {
+
+template <class T> class PrivateImplPrivate;
+
+/**
+ * Base classes for objects with a private implementation.
+ *
+ * PrivateImpl objects have value semantics: copying the object also
+ * makes a copy of the implementation.
+ */
+template <class T> class PrivateImpl {
+ public:
+ QPID_CLIENT_EXTERN ~PrivateImpl();
+ QPID_CLIENT_EXTERN PrivateImpl(const PrivateImpl&);
+ QPID_CLIENT_EXTERN PrivateImpl& operator=(const PrivateImpl&);
+ QPID_CLIENT_EXTERN void swap(PrivateImpl<T>&);
+
+ protected:
+ QPID_CLIENT_EXTERN PrivateImpl(T*);
+ T* impl;
+
+ friend class PrivateImplPrivate<T>;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_PRIVATEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/PrivateImplPrivate.h b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h
new file mode 100644
index 0000000000..021456e085
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h
@@ -0,0 +1,66 @@
+#ifndef QPID_CLIENT_PRIVATEIMPLPRIVATE_H
+#define QPID_CLIENT_PRIVATEIMPLPRIVATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <algorithm>
+
+namespace qpid {
+namespace client {
+
+/** @file
+ * Implementation of PrivateImpl functions, to include in .cpp file of handle subclasses.
+ * T can be any class with value semantics.
+ */
+
+template <class T>
+PrivateImpl<T>::PrivateImpl(T* p) : impl(p) { assert(impl); }
+
+template <class T>
+PrivateImpl<T>::~PrivateImpl() { delete impl; }
+
+template <class T>
+PrivateImpl<T>::PrivateImpl(const PrivateImpl& h) : impl(new T(*h.impl)) {}
+
+template <class T>
+PrivateImpl<T>& PrivateImpl<T>::operator=(const PrivateImpl<T>& h) { PrivateImpl<T>(h).swap(*this); return *this; }
+
+template <class T>
+void PrivateImpl<T>::swap(PrivateImpl<T>& h) { std::swap(impl, h.impl); }
+
+
+/** Access to private impl of a PrivateImpl */
+template <class T>
+class PrivateImplPrivate {
+ public:
+ static T* get(const PrivateImpl<T>& h) { return h.impl; }
+ static void set(PrivateImpl<T>& h, const T& p) { PrivateImpl<T>(p).swap(h); }
+};
+
+template<class T> T* privateImplGetPtr(PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); }
+template<class T> T* privateImplGetPtr(const PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); }
+template<class T> void privateImplSetPtr(PrivateImpl<T>& h, const T*& p) { PrivateImplPrivate<T>::set(h, p); }
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_PRIVATEIMPLPRIVATE_H*/
+
diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
index e81b78ecd3..8a33c7393f 100644
--- a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
+++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -20,6 +20,8 @@
*/
#include "SessionBase_0_10.h"
#include "Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/Future.h"
#include "qpid/framing/all_method_bodies.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.h b/qpid/cpp/src/qpid/client/SessionBase_0_10.h
index 3ae21936f6..d375b3ec2e 100644
--- a/qpid/cpp/src/qpid/client/SessionBase_0_10.h
+++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.h
@@ -23,14 +23,11 @@
*/
#include "qpid/SessionId.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/framing/amqp_structs.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Message.h"
#include "qpid/client/Completion.h"
-#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Execution.h"
-#include "qpid/client/SessionImpl.h"
#include "qpid/client/TypedResult.h"
#include "qpid/shared_ptr.h"
#include "qpid/client/ClientImportExport.h"
@@ -44,7 +41,6 @@ class Connection;
using std::string;
using framing::Content;
using framing::FieldTable;
-using framing::MethodContent;
using framing::SequenceNumber;
using framing::SequenceSet;
using framing::SequenceNumberSet;
@@ -62,8 +58,6 @@ enum CreditUnit { MESSAGE_CREDIT=0, BYTE_CREDIT=1, UNLIMITED_CREDIT=0xFFFFFFFF }
*/
class SessionBase_0_10 {
public:
-
- typedef framing::TransferContent DefaultContent;
///@internal
QPID_CLIENT_EXTERN SessionBase_0_10();
diff --git a/qpid/cpp/src/qpid/client/Subscription.cpp b/qpid/cpp/src/qpid/client/Subscription.cpp
index 1f1b5ac6c6..37f5557d51 100644
--- a/qpid/cpp/src/qpid/client/Subscription.cpp
+++ b/qpid/cpp/src/qpid/client/Subscription.cpp
@@ -21,6 +21,7 @@
#include "Subscription.h"
#include "SubscriptionImpl.h"
+#include "CompletionImpl.h"
#include "HandlePrivate.h"
#include "qpid/framing/enum.h"
diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
index e09a4c142e..82c920cf47 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -20,8 +20,12 @@
*/
#include "SubscriptionImpl.h"
+#include "MessageImpl.h"
+#include "CompletionImpl.h"
#include "SubscriptionManager.h"
#include "SubscriptionSettings.h"
+#include "HandlePrivate.h"
+#include "PrivateImplPrivate.h"
namespace qpid {
namespace client {
@@ -114,9 +118,9 @@ void SubscriptionImpl::cancel() { manager.cancel(name); }
void SubscriptionImpl::received(Message& m) {
Mutex::ScopedLock l(lock);
- if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
+ if (privateImplGetPtr(m)->getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
unacquired.add(m.getId());
- else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
+ else if (privateImplGetPtr(m)->getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
unaccepted.add(m.getId());
if (listener) {
diff --git a/qpid/cpp/src/qpid/client/TypedResult.h b/qpid/cpp/src/qpid/client/TypedResult.h
index 5306997d74..2e54f9fdfc 100644
--- a/qpid/cpp/src/qpid/client/TypedResult.h
+++ b/qpid/cpp/src/qpid/client/TypedResult.h
@@ -23,6 +23,7 @@
#define _TypedResult_
#include "Completion.h"
+#include "qpid/framing/StructHelper.h"
namespace qpid {
namespace client {
@@ -39,7 +40,7 @@ template <class T> class TypedResult : public Completion
public:
///@internal
- TypedResult(Future f, shared_ptr<SessionImpl> s) : Completion(f, s), decoded(false) {}
+ TypedResult(CompletionImpl* c) : Completion(c), decoded(false) {}
/**
* Wait for the asynchronous command that returned this TypedResult to complete
@@ -49,13 +50,12 @@ public:
*@exception If the command returns an error, get() throws an exception.
*
*/
- T& get()
- {
+ T& get() {
if (!decoded) {
- future.decodeResult(result, *session);
+ framing::StructHelper helper;
+ helper.decode(result, getResult());
decoded = true;
}
-
return result;
}
};
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index f8e412f1e6..a17f54078c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,6 +36,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -46,7 +47,6 @@
#include "qpid/management/ManagementBroker.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
-#include "qpid/sys/LatencyMetric.h"
#include "qpid/sys/Thread.h"
#include <boost/bind.hpp>
@@ -63,6 +63,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
using namespace qpid::cluster;
+using namespace qpid::framing::cluster;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -77,9 +78,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
- void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
+ void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+ void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -112,7 +114,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
discarding(true),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ error(*this)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -195,14 +198,19 @@ void Cluster::leave() {
leave(l);
}
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
+ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+ } do {} while(0)
+
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- try { broker.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
- }
+ // Finalize connections now now to avoid problems later in destructor.
+ LEAVE_TRY(localConnections.clear());
+ LEAVE_TRY(connections.clear());
+ LEAVE_TRY(broker.shutdown());
}
}
@@ -218,8 +226,6 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- if (from == self) // Record self-deliveries for flow control.
- mcast.selfDeliver(e);
deliverEvent(e);
}
@@ -254,10 +260,22 @@ void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DROP: " << e);
}
+void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+ Mutex::ScopedLock l(lock);
+ error.error(connection, type, map.getFrameSeq(), map.getMembers());
+}
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
Mutex::ScopedLock l(lock);
+ // Process each frame through the error checker.
+ error.delivered(e);
+ while (error.canProcess()) // There is a frame ready to process.
+ processFrame(error.getNext(), l);
+}
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -265,7 +283,8 @@ void Cluster::deliveredFrame(const EventFrame& e) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ map.incrementFrameSeq();
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
@@ -316,11 +335,11 @@ ostream& operator<<(ostream& o, const AddrList& a) {
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " (joined) "; break;
- case CPG_REASON_LEAVE: reasonString = " (left) "; break;
- case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break;
- case CPG_REASON_NODEUP: reasonString = " (node-up) "; break;
- case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break;
+ case CPG_REASON_JOIN: reasonString = "(joined) "; break;
+ case CPG_REASON_LEAVE: reasonString = "(left) "; break;
+ case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break;
+ case CPG_REASON_NODEUP: reasonString = "(node-up) "; break;
+ case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
@@ -342,8 +361,8 @@ void Cluster::configChange (
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
- << AddrList(left, nLeft, "( ", ")"));
+ QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
+ << AddrList(left, nLeft, "left: "));
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
@@ -357,8 +376,8 @@ void Cluster::setReady(Lock&) {
broker.getQueueEvents().enable();
}
-void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
- bool memberChange = map.configChange(addresses);
+void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
+ bool memberChange = map.configChange(current);
if (state == LEFT) return;
if (!map.isAlive(self)) { // Final config change.
@@ -589,19 +608,24 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_memberIDs(idstr);
}
- // Erase connections belonging to members that have left the cluster.
+ // Close connections belonging to members that have left the cluster.
ConnectionMap::iterator i = connections.begin();
while (i != connections.end()) {
ConnectionMap::iterator j = i++;
MemberId m = j->second->getId().getMember();
if (m != self && !map.isMember(m))
- connections.erase(j);
+ j->second->deliverClose();
}
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.self << "(" << STATE[cluster.state] << ")";
+ static const char* STATE[] = {
+ "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ };
+ assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+ o << cluster.self << "(" << STATE[cluster.state];
+ if (cluster.error.isUnresolved()) o << "/error";
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -635,4 +659,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
+void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+ // If we receive an errorCheck here, it's because we have processed past the point
+ // of the error so respond with ERROR_TYPE_NONE
+ assert(map.getFrameSeq() >= frameSeq);
+ if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index b716e2d781..8a94fc79dd 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -23,6 +23,7 @@
#include "ClusterSettings.h"
#include "Cpg.h"
#include "Decoder.h"
+#include "ErrorCheck.h"
#include "Event.h"
#include "EventFrame.h"
#include "ExpiryPolicy.h"
@@ -105,6 +106,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void deliverFrame(const EventFrame&);
+ // Called in deliverFrame thread to indicate an error from the broker.
+ void flagError(Connection&, ErrorCheck::ErrorType);
+ void connectionError();
+
// Called only during update by Connection::shadowReady
Decoder& getDecoder() { return decoder; }
@@ -132,13 +137,15 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// == Called in deliverFrameQueue thread
void deliveredFrame(const EventFrame&);
+ void processFrame(const EventFrame&, Lock&);
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
- void configChange(const MemberId&, const std::string& addresses, Lock& l);
+ void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
+ void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&);
void shutdown(const MemberId&, Lock&);
// Helper functions
@@ -216,11 +223,13 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Decoder decoder;
bool discarding;
+
// Remaining members are protected by lock.
- // FIXME aconway 2009-03-06: Most of these members are also only used in
+
+ // TODO aconway 2009-03-06: Most of these members are also only used in
// deliverFrameQueue thread or during stall. Review and separate members
// that require a lock, drop lock when not needed.
- //
+
mutable sys::Monitor lock;
@@ -243,7 +252,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
-
+ ErrorCheck error;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index 9e7232180d..0395ff6382 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -33,6 +33,13 @@ using namespace framing;
namespace cluster {
+ClusterMap::Set ClusterMap::decode(const std::string& s) {
+ Set set;
+ for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8)
+ set.insert(MemberId(std::string(i, i+8)));
+ return set;
+}
+
namespace {
void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) {
@@ -54,9 +61,9 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
}
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : frameSeq(0) {}
-ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
+ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) {
alive.insert(id);
if (isMember)
members[id] = url;
@@ -64,7 +71,9 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
joiners[id] = url;
}
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) {
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+ : frameSeq(frameSeq_)
+{
std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
}
@@ -78,22 +87,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const
}
b.getMembers().clear();
std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
-}
-
-bool ClusterMap::configChange(
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
-{
- cpg_address* a;
- bool memberChange=false;
- for (a = left; a != left+nLeft; ++a) {
- memberChange = memberChange || members.erase(*a);
- joiners.erase(*a);
- }
- alive.clear();
- std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
- return memberChange;
+ b.setFrameSeq(frameSeq);
}
Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
@@ -123,8 +117,13 @@ std::vector<Url> ClusterMap::memberUrls() const {
return urls;
}
-ClusterMap::Set ClusterMap::getAlive() const {
- return alive;
+ClusterMap::Set ClusterMap::getAlive() const { return alive; }
+
+ClusterMap::Set ClusterMap::getMembers() const {
+ Set s;
+ std::transform(members.begin(), members.end(), std::inserter(s, s.begin()),
+ boost::bind(&Map::value_type::first, _1));
+ return s;
}
std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
@@ -158,7 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) {
bool ClusterMap::configChange(const std::string& addresses) {
bool memberChange = false;
- Set update;
+ Set update = decode(addresses);
for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8)
update.insert(MemberId(std::string(i, i+8)));
Set removed;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index 4548441442..3359c7c1f3 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -38,26 +38,26 @@
namespace qpid {
namespace cluster {
+typedef std::set<MemberId> MemberSet;
+
/**
- * Map of established cluster members and joiners waiting for an update.
+ * Map of established cluster members and joiners waiting for an update,
+ * along with other cluster state that must be updated.
*/
class ClusterMap {
public:
typedef std::map<MemberId, Url> Map;
typedef std::set<MemberId> Set;
+ static Set decode(const std::string&);
+
ClusterMap();
ClusterMap(const MemberId& id, const Url& url, bool isReady);
- ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
+ ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
/** Update from config change.
*@return true if member set changed.
*/
- bool configChange(
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined);
-
bool configChange(const std::string& addresses);
bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
@@ -78,6 +78,7 @@ class ClusterMap {
std::vector<std::string> memberIds() const;
std::vector<Url> memberUrls() const;
Set getAlive() const;
+ Set getMembers() const;
bool updateRequest(const MemberId& id, const std::string& url);
/** Return non-empty Url if accepted */
@@ -90,11 +91,16 @@ class ClusterMap {
* Utility method to return intersection of two member sets
*/
static Set intersection(const Set& a, const Set& b);
+
+ uint64_t getFrameSeq() { return frameSeq; }
+ uint64_t incrementFrameSeq() { return ++frameSeq; }
+
private:
Url getUrl(const Map& map, const MemberId& id);
Map joiners, members;
Set alive;
+ uint64_t frameSeq;
friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index aa7d082720..97cafbabaa 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -39,8 +39,6 @@
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyMetric.h"
-#include "qpid/sys/AtomicValue.h"
#include <boost/current_function.hpp>
@@ -56,8 +54,16 @@ namespace qpid {
namespace cluster {
using namespace framing;
+using namespace framing::cluster;
+
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
+Connection::NullFrameHandler Connection::nullFrameHandler;
+
+struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+};
-NoOpConnectionOutputHandler Connection::discardHandler;
namespace {
sys::AtomicValue<uint64_t> idCounter;
@@ -89,6 +95,8 @@ void Connection::init() {
connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
connection.setClientThrottling(false); // Disable client throttling, done by active node.
}
+ if (!isCatchUp())
+ connection.setErrorListener(this);
}
void Connection::giveReadCredit(int credit) {
@@ -97,6 +105,7 @@ void Connection::giveReadCredit(int credit) {
}
Connection::~Connection() {
+ connection.setErrorListener(0);
QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
@@ -126,7 +135,7 @@ void Connection::received(framing::AMQFrame& f) {
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection.getOutput().send(ok);
- output.closeOutput(discardHandler);
+ output.closeOutput();
catchUp = false;
}
else
@@ -156,8 +165,8 @@ void Connection::deliveredFrame(const EventFrame& f) {
{
if (f.type == DATA) // incoming data frames to broker::Connection
connection.received(const_cast<AMQFrame&>(f.frame));
- else { // frame control, send frame via SessionState
- broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ else { // frame control, send frame via SessionState
+ broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
}
@@ -180,7 +189,7 @@ void Connection::closed() {
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
- output.closeOutput(discardHandler);
+ output.closeOutput();
cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
}
}
@@ -275,13 +284,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
connection.setUserId(username);
- // OK to use decoder here because we are stalled for update.
+ // OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
+ connection.setErrorListener(this);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members));
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
self.second = 0; // Mark this as completed update connection.
}
@@ -305,7 +315,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
}
broker::QueuedMessage Connection::getUpdateMessage() {
- broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+ shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
+ assert(!updateq->isDurable());
+ broker::QueuedMessage m = updateq->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
@@ -342,15 +354,15 @@ void Connection::deliveryRecord(const string& qname,
// If the message was unacked, the newbie broker must place
// it in its messageStore.
- if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled )
+ if ( m.payload && m.payload->isPersistent() && acquired && !ended)
queue->enqueue ( 0, m.payload );
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
- shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
- if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
- q->setPosition(position);
-}
+ shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+ if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+ q->setPosition(position);
+ }
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
@@ -407,7 +419,14 @@ void Connection::queue(const std::string& encoded) {
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
-qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+void Connection::sessionError(uint16_t , const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_SESSION);
+
+}
+
+void Connection::connectionError(const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 6434f763a8..414e5c935f 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -25,7 +25,6 @@
#include "types.h"
#include "WriteEstimate.h"
#include "OutputInterceptor.h"
-#include "NoOpConnectionOutputHandler.h"
#include "EventFrame.h"
#include "McastFrameHandler.h"
@@ -58,7 +57,8 @@ class Event;
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
- public framing::AMQP_AllOperations::ClusterConnectionHandler
+ public framing::AMQP_AllOperations::ClusterConnectionHandler,
+ private broker::Connection::ErrorListener
{
public:
@@ -120,7 +120,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
@@ -151,14 +151,22 @@ class Connection :
void giveReadCredit(int credit);
+ void deliverClose();
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
};
+
+ static NullFrameHandler nullFrameHandler;
+
+ // Error listener functions
+ void connectionError(const std::string&);
+ void sessionError(uint16_t channel, const std::string&);
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
- void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
@@ -167,8 +175,6 @@ class Connection :
broker::SemanticState& semanticState();
broker::QueuedMessage getUpdateMessage();
- static NoOpConnectionOutputHandler discardHandler;
-
Cluster& cluster;
ConnectionId self;
bool catchUp;
@@ -181,7 +187,6 @@ class Connection :
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
- NullFrameHandler nullFrameHandler;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 915a578989..f746eacea4 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -75,11 +75,14 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow
::memset(&callbacks, sizeof(callbacks), 0);
callbacks.cpg_deliver_fn = &globalDeliver;
callbacks.cpg_confchg_fn = &globalConfigChange;
+
+ QPID_LOG(info, "Initializing CPG");
cpg_error_t err = cpg_initialize(&handle, &callbacks);
- if (err == CPG_ERR_TRY_AGAIN) {
- QPID_LOG(notice, "Waiting for CPG initialization.");
- while (CPG_ERR_TRY_AGAIN == (err = cpg_initialize(&handle, &callbacks)))
- sys::sleep(5);
+ int retries = 6;
+ while (err == CPG_ERR_TRY_AGAIN && --retries) {
+ QPID_LOG(notice, "Re-trying CPG initialization.");
+ sys::sleep(5);
+ err = cpg_initialize(&handle, &callbacks);
}
check(err, "Failed to initialize CPG.");
check(cpg_context_set(handle, this), "Cannot set CPG context");
@@ -87,7 +90,6 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow
// windows then this needs to be refactored into
// qpid::sys::<platform>
IOHandle::impl->fd = getFd();
- QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle);
}
Cpg::~Cpg() {
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
new file mode 100644
index 0000000000..6132d52126
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ErrorCheck.h"
+#include "EventFrame.h"
+#include "ClusterMap.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+using namespace framing;
+using namespace framing::cluster;
+
+ErrorCheck::ErrorCheck(Cluster& c)
+ : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
+{}
+
+ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+ copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
+ return o;
+}
+
+void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+{
+ // Detected a local error, inform cluster and set error state.
+ assert(t != ERROR_TYPE_NONE); // Must be an error.
+ assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+ type = t;
+ unresolved = ms;
+ frameSeq = seq;
+ connection = &c;
+ QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
+ << " error " << frameSeq << " unresolved: " << unresolved);
+ mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
+}
+
+void ErrorCheck::delivered(const EventFrame& e) {
+ if (isUnresolved()) {
+ const ClusterErrorCheckBody* errorCheck =
+ dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod());
+ const ClusterConfigChangeBody* configChange =
+ dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+
+ if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ if (errorCheck->getType() < type) { // my error is worse than his
+ QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+ throw Exception("Aborted by local failure that did not occur on all replicas");
+ }
+ else { // his error is worse/same as mine.
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+ unresolved.erase(e.getMemberId());
+ checkResolved();
+ }
+ }
+ else {
+ frames.push_back(e); // Only drop matching errorCheck controls.
+ if (configChange) {
+ MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ MemberSet result;
+ set_intersection(members.begin(), members.end(),
+ unresolved.begin(), unresolved.end(),
+ inserter(result, result.begin()));
+ unresolved.swap(result);
+ checkResolved();
+ }
+ }
+ }
+ else
+ frames.push_back(e);
+}
+
+void ErrorCheck::checkResolved() {
+ if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
+ type = ERROR_TYPE_NONE;
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+ }
+ else
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+}
+
+EventFrame ErrorCheck::getNext() {
+ assert(canProcess());
+ EventFrame e(frames.front());
+ frames.pop_front();
+ return e;
+}
+
+bool ErrorCheck::canProcess() const {
+ return type == ERROR_TYPE_NONE && !frames.empty();
+}
+
+bool ErrorCheck::isUnresolved() const {
+ return type != ERROR_TYPE_NONE;
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
new file mode 100644
index 0000000000..97b5f2bffd
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
@@ -0,0 +1,80 @@
+#ifndef QPID_CLUSTER_ERRORCHECK_H
+#define QPID_CLUSTER_ERRORCHECK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/enum.h"
+#include <boost/function.hpp>
+#include <deque>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+class EventFrame;
+class ClusterMap;
+class Cluster;
+class Multicaster;
+class Connection;
+
+/**
+ * Error checking logic.
+ *
+ * When an error occurs stop processing frames and queue them until we
+ * can determine if all nodes experienced the error. If not, we shut down.
+ */
+class ErrorCheck
+{
+ public:
+ typedef std::set<MemberId> MemberSet;
+ typedef framing::cluster::ErrorType ErrorType;
+
+ ErrorCheck(Cluster&);
+
+ /** A local error has occured */
+ void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+
+ /** Called when a frame is delivered */
+ void delivered(const EventFrame&);
+
+ EventFrame getNext();
+
+ bool canProcess() const;
+ bool isUnresolved() const;
+
+ private:
+ void checkResolved();
+
+ Cluster& cluster;
+ Multicaster& mcast;
+ std::deque<EventFrame> frames;
+ std::set<MemberId> unresolved;
+ uint64_t frameSeq;
+ ErrorType type;
+ Connection* connection;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_ERRORCHECK_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h
index e05ad60bcf..f0e445a08c 100644
--- a/qpid/cpp/src/qpid/cluster/Event.h
+++ b/qpid/cpp/src/qpid/cluster/Event.h
@@ -25,7 +25,6 @@
#include "types.h"
#include "qpid/RefCountedBuffer.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/LatencyMetric.h"
#include <sys/uio.h> // For iovec
#include <iosfwd>
@@ -42,7 +41,7 @@ class Buffer;
namespace cluster {
/** Header data for a multicast event */
-class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
+class EventHeader {
public:
EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
void decode(const MemberId& m, framing::Buffer&);
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
index 9350c801f5..a48d134f1b 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
@@ -28,9 +28,7 @@ EventFrame::EventFrame() {}
EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
: connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType())
-{
- QPID_LATENCY_INIT(frame);
-}
+{}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
if (e.frame.getBody()) o << e.frame;
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h
index d6ff58dd38..e275aac7aa 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.h
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.h
@@ -25,7 +25,6 @@
#include "types.h"
#include "Event.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/LatencyMetric.h"
#include <boost/intrusive_ptr.hpp>
#include <iosfwd>
@@ -45,6 +44,7 @@ struct EventFrame
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
bool isLastInEvent() const { return readCredit; }
+ MemberId getMemberId() const { return connectionId.getMember(); }
ConnectionId connectionId;
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index 409180c499..348963f901 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -50,6 +50,13 @@ void ExpiryPolicy::willExpire(broker::Message& m) {
timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
+void ExpiryPolicy::forget(broker::Message& m) {
+ MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+ assert(i != unexpiredByMessage.end());
+ unexpiredById.erase(i->second);
+ unexpiredByMessage.erase(i);
+}
+
bool ExpiryPolicy::hasExpired(broker::Message& m) {
return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
index 9f8b1a9236..c147e54796 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -49,8 +49,8 @@ class ExpiryPolicy : public broker::ExpiryPolicy
ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
void willExpire(broker::Message&);
-
bool hasExpired(broker::Message&);
+ void forget(broker::Message&);
// Send expiration notice to cluster.
void sendExpire(uint64_t);
diff --git a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
index 8b2f6dae8e..4df742d6c2 100644
--- a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
+++ b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
@@ -52,6 +52,8 @@ class LockedConnectionMap
return 0;
}
+ void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); }
+
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
mutable sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
index f0738ab08f..3b9d3ac990 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
@@ -22,7 +22,6 @@
#include "Multicaster.h"
#include "Cpg.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyMetric.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/AMQFrame.h"
@@ -64,7 +63,6 @@ void Multicaster::mcast(const Event& e) {
return;
}
}
- QPID_LATENCY_INIT(e);
queue.push(e);
}
@@ -73,7 +71,6 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
try {
PollableEventQueue::Queue::iterator i = values.begin();
while( i != values.end()) {
- QPID_LATENCY_RECORD("mcast send queue", *i);
iovec iov = i->toIovec();
if (!cpg.mcast(&iov, 1)) {
// cpg didn't send because of CPG flow control.
@@ -97,9 +94,4 @@ void Multicaster::release() {
holdingQueue.clear();
}
-void Multicaster::selfDeliver(const Event& e) {
- sys::Mutex::ScopedLock l(lock);
- QPID_LATENCY_RECORD("cpg self deliver", e);
-}
-
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h
index d1c3115977..baa5b87f38 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.h
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.h
@@ -55,8 +55,6 @@ class Multicaster
void mcast(const Event& e);
/** End holding mode, held events are mcast */
void release();
- /** Call when events are self-delivered to manage flow control. */
- void selfDeliver(const Event&);
private:
typedef sys::PollableQueue<Event> PollableEventQueue;
diff --git a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
index 74a376a657..6a30bddf06 100644
--- a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
+++ b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
@@ -30,8 +30,7 @@ namespace framing { class AMQFrame; }
namespace cluster {
/**
- * Output handler for frames sent to noop connections.
- * Simply discards frames.
+ * Output handler shadow connections, simply discards frames.
*/
class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
{
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index cd42446016..6af114a662 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -32,8 +32,9 @@ namespace cluster {
using namespace framing;
-OutputInterceptor::OutputInterceptor(
- cluster::Connection& p, sys::ConnectionOutputHandler& h)
+NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
+
+OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
: parent(p), closing(false), next(&h), sent(),
writeEstimate(p.getCluster().getWriteEstimate()),
moreOutput(), doingOutput()
@@ -47,7 +48,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
}
if (!parent.isCatchUp())
sent += f.encodedSize();
- QPID_LATENCY_RECORD("up to write queue", f);
}
void OutputInterceptor::activateOutput() {
@@ -98,7 +98,6 @@ void OutputInterceptor::deliverDoOutput(size_t requested) {
// Send a doOutput request if one is not already in flight.
void OutputInterceptor::sendDoOutput() {
if (!parent.isLocal()) return;
- QPID_LATENCY_INIT(*this);
doingOutput = true;
size_t request = writeEstimate.sending(getBuffered());
@@ -111,10 +110,10 @@ void OutputInterceptor::sendDoOutput() {
QPID_LOG(trace, parent << "Send doOutput request for " << request);
}
-void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) {
+void OutputInterceptor::closeOutput() {
sys::Mutex::ScopedLock l(lock);
closing = true;
- next = &h;
+ next = &discardHandler;
}
void OutputInterceptor::close() {
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
index c080a419e1..61e246bb89 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -23,9 +23,9 @@
*/
#include "WriteEstimate.h"
+#include "NoOpConnectionOutputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/broker/ConnectionFactory.h"
-#include "qpid/sys/LatencyMetric.h"
#include <boost/function.hpp>
namespace qpid {
@@ -37,7 +37,7 @@ class Connection;
/**
* Interceptor for connection OutputHandler, manages outgoing message replication.
*/
-class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetricTimestamp {
+class OutputInterceptor : public sys::ConnectionOutputHandler {
public:
OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h);
@@ -53,7 +53,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri
// Intercept doOutput requests on Connection.
bool doOutput();
- void closeOutput(sys::ConnectionOutputHandler& h);
+ void closeOutput();
cluster::Connection& parent;
@@ -70,6 +70,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri
WriteEstimate writeEstimate;
bool moreOutput;
bool doingOutput;
+ static NoOpConnectionOutputHandler discardHandler;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 97eae7efa3..bb4df8890a 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -26,6 +26,9 @@
#include "ExpiryPolicy.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
@@ -98,10 +101,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
expiry(expiry_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
-{
- connection.open(url, cs);
- session = connection.newSession(UPDATE);
-}
+{}
UpdateClient::~UpdateClient() {}
@@ -110,6 +110,8 @@ const std::string UpdateClient::UPDATE("qpid.cluster-update");
void UpdateClient::run() {
try {
+ connection.open(updateeUrl, connectionSettings);
+ session = connection.newSession(UPDATE);
update();
done();
} catch (const std::exception& e) {
@@ -126,15 +128,19 @@ void UpdateClient::update() {
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
- session.close();
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ session.queueDelete(arg::queue=UPDATE);
+ session.close();
+
+
ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
+
connection.close();
QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
}
@@ -203,7 +209,6 @@ class MessageUpdater {
sb.get()->send(transfer, message.payload->getFrames());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
-
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 23d061b7e4..96e2479955 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -24,6 +24,7 @@
#include "ClusterMap.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h
index 34319e7ed4..2f0a01d6ca 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.h
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.h
@@ -26,7 +26,6 @@
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
#include "ProtocolVersion.h"
-#include "qpid/sys/LatencyMetric.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/cast.hpp>
#include "qpid/CommonImportExport.h"
@@ -34,7 +33,7 @@
namespace qpid {
namespace framing {
-class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
+class AMQFrame : public AMQDataBlock
{
public:
QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0);
diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp b/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
deleted file mode 100644
index caa221def4..0000000000
--- a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifdef QPID_LATENCY_METRIC
-
-#include "LatencyMetric.h"
-#include "Time.h"
-#include <iostream>
-
-namespace qpid {
-namespace sys {
-
-void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) {
- const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now());
-}
-
-void LatencyMetricTimestamp::clear(const LatencyMetricTimestamp& ts) {
- const_cast<int64_t&>(ts.latency_metric_timestamp) = 0;
-}
-
-LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) :
- message(msg), count(0), total(0), skipped(0), skip(skip_)
-{}
-
-LatencyMetric::~LatencyMetric() { report(); }
-
-void LatencyMetric::record(const LatencyMetricTimestamp& start) {
- if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps.
- if (skip) {
- if (++skipped < skip) return;
- else skipped = 0;
- }
- ++count;
- int64_t now_ = Duration(now());
- total += now_ - start.latency_metric_timestamp;
- // Set start time for next leg of the journey
- const_cast<int64_t&>(start.latency_metric_timestamp) = now_;
-}
-
-void LatencyMetric::report() {
- using namespace std;
- if (count) {
- cout << "LATENCY: " << message << ": "
- << total / (count * TIME_USEC) << " microseconds" << endl;
- }
- else {
- cout << "LATENCY: " << message << ": no data." << endl;
- }
- count = 0;
- total = 0;
-}
-
-
-}} // namespace qpid::sys
-
-#endif
diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.h b/qpid/cpp/src/qpid/sys/LatencyMetric.h
deleted file mode 100644
index 63b5020db4..0000000000
--- a/qpid/cpp/src/qpid/sys/LatencyMetric.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef QPID_SYS_LATENCYMETRIC_H
-#define QPID_SYS_LATENCYMETRIC_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifdef QPID_LATENCY_METRIC
-
-#include "qpid/sys/IntegerTypes.h"
-
-namespace qpid {
-namespace sys {
-
-/** Use this base class to add a timestamp for latency to an object */
-struct LatencyMetricTimestamp {
- LatencyMetricTimestamp() : latency_metric_timestamp(0) {}
- static void initialize(const LatencyMetricTimestamp&);
- static void clear(const LatencyMetricTimestamp&);
- int64_t latency_metric_timestamp;
-};
-
-/**
- * Record average latencies, report on destruction.
- *
- * For debugging only, use via macros below so it can be compiled out
- * of production code.
- */
-class LatencyMetric {
- public:
- /** msg should be a string literal. */
- LatencyMetric(const char* msg, int64_t skip_=0);
- ~LatencyMetric();
-
- void record(const LatencyMetricTimestamp& start);
-
- private:
- void report();
- const char* message;
- int64_t count, total, skipped, skip;
-};
-
-}} // namespace qpid::sys
-
-#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x)
-#define QPID_LATENCY_CLEAR(x) ::qpid::sys::LatencyMetricTimestamp::clear(x)
-#define QPID_LATENCY_RECORD(msg, x) do { \
- static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \
- } while (false)
-#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) do { \
- static ::qpid::sys::LatencyMetric metric__(msg, skip); metric__.record(x); \
- } while (false)
-
-
-#else /* defined QPID_LATENCY_METRIC */
-
-namespace qpid { namespace sys {
-class LatencyMetricTimestamp {};
-}}
-
-#define QPID_LATENCY_INIT(x) (void)x
-#define QPID_LATENCY_CLEAR(x) (void)x
-#define QPID_LATENCY_RECORD(msg, x) (void)x
-#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) (void)x
-
-#endif /* defined QPID_LATENCY_METRIC */
-
-#endif /*!QPID_SYS_LATENCYMETRIC_H*/
diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
index 067c0fe6b7..cd89f39292 100755
--- a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
+++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -23,6 +23,7 @@
*/
#include "AsynchIoResult.h"
+#include "qpid/CommonImportExport.h"
#include <winsock2.h>
@@ -49,7 +50,7 @@ public:
AsynchIO::RequestCallback cbRequest;
};
-SOCKET toFd(const IOHandlePrivate* h);
+QPID_COMMON_EXTERN SOCKET toFd(const IOHandlePrivate* h);
}}