diff options
Diffstat (limited to 'qpid/cpp/src/qpid')
62 files changed, 928 insertions, 402 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index db957051d8..f927db09bb 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -88,6 +88,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const SessionException& e) { QPID_LOG(error, "Execution exception: " << e.what()); + executionException(e.code, e.what()); // Let subclass handle this first. framing::AMQP_AllProxy::Execution execution(channel); AMQMethodBody* m = f.getMethod(); SequenceNumber commandId; @@ -98,6 +99,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const ChannelException& e){ QPID_LOG(error, "Channel exception: " << e.what()); + channelException(e.code, e.what()); // Let subclass handle this first. peer.detached(name, e.code); } catch(const ConnectionException& e) { diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h index 0b158ec2b4..0d9c72ff02 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -87,8 +87,9 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, QPID_COMMON_EXTERN virtual void invoke(const framing::AMQMethodBody& m); virtual void setState(const std::string& sessionName, bool force) = 0; - virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0; virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0; + virtual void channelException(framing::session::DetachCode, const std::string& msg) = 0; + virtual void executionException(framing::execution::ErrorCode, const std::string& msg) = 0; virtual void detaching() = 0; // Notification of events diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index b06e06d353..365b3ccbeb 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -57,7 +57,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtObject(0), links(broker_.getLinks()), agent(0), - timer(broker_.getTimer()) + timer(broker_.getTimer()), + errorListener(0) { Manageable* parent = broker.GetVhostObject(); diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index b659fe6468..e67cdce681 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -66,6 +66,17 @@ class Connection : public sys::ConnectionInputHandler, public RefCounted { public: + /** + * Listener that can be registered with a Connection to be informed of errors. + */ + class ErrorListener + { + public: + virtual ~ErrorListener() {} + virtual void sessionError(uint16_t channel, const std::string&) = 0; + virtual void connectionError(const std::string&) = 0; + }; + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0); ~Connection (); @@ -101,6 +112,9 @@ class Connection : public sys::ConnectionInputHandler, const std::string& getMgmtId() const { return mgmtId; } management::ManagementAgent* getAgent() const { return agent; } void setFederationLink(bool b); + /** Connection does not delete the listener. 0 resets. */ + void setErrorListener(ErrorListener* l) { errorListener=l; } + ErrorListener* getErrorListener() { return errorListener; } void setHeartbeatInterval(uint16_t heartbeat); void sendHeartbeat(); @@ -112,6 +126,7 @@ class Connection : public sys::ConnectionInputHandler, void sendClose(); void setSecureConnection(SecureConnection* secured); + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; @@ -128,6 +143,8 @@ class Connection : public sys::ConnectionInputHandler, management::ManagementAgent* agent; Timer& timer; boost::intrusive_ptr<TimerTask> heartbeatTimer; + ErrorListener* errorListener; + public: qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } }; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 63212c7794..8b70836da0 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -64,13 +64,16 @@ void ConnectionHandler::heartbeat() void ConnectionHandler::handle(framing::AMQFrame& frame) { AMQMethodBody* method=frame.getBody()->getMethod(); + Connection::ErrorListener* errorListener = handler->connection.getErrorListener(); try{ if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) { handler->connection.getChannel(frame.getChannel()).in(frame); } }catch(ConnectionException& e){ + if (errorListener) errorListener->connectionError(e.what()); handler->proxy.close(e.code, e.what()); }catch(std::exception& e){ + if (errorListener) errorListener->connectionError(e.what()); handler->proxy.close(541/*internal error*/, e.what()); } } diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp index 907f1e56e1..ffe0cc437b 100644 --- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp @@ -33,4 +33,6 @@ bool ExpiryPolicy::hasExpired(Message& m) { return m.getExpiration() < sys::AbsTime::now(); } +void ExpiryPolicy::forget(Message&) {} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h index cefe9b7552..eeb3ffda21 100644 --- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h @@ -39,6 +39,7 @@ class ExpiryPolicy : public RefCounted QPID_BROKER_EXTERN virtual ~ExpiryPolicy(); QPID_BROKER_EXTERN virtual void willExpire(Message&); QPID_BROKER_EXTERN virtual bool hasExpired(Message&); + QPID_BROKER_EXTERN virtual void forget(Message&); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 40b5515829..1e9eb9d386 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -53,6 +53,8 @@ Message::Message(const framing::SequenceNumber& id) : Message::~Message() { + if (expiryPolicy) + expiryPolicy->forget(*this); } void Message::forcePersistent() @@ -334,7 +336,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; - if (expiryPolicy) + if (expiryPolicy) expiryPolicy->willExpire(*this); } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 442c3eb34b..ca1f875991 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -45,14 +45,20 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace -void SessionHandler::channelException(framing::session::DetachCode, const std::string&) { - handleDetach(); -} - void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) { + // NOTE: must tell the error listener _before_ calling connection.close() + if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg); connection.close(code, msg); } +void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) { + if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +} + +void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) { + if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +} + ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index ffc032f64c..ca6d6bb193 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -73,8 +73,9 @@ class SessionHandler : public amqp_0_10::SessionHandler { virtual void setState(const std::string& sessionName, bool force); virtual qpid::SessionState* getState(); virtual framing::FrameHandler* getInHandler(); - virtual void channelException(framing::session::DetachCode code, const std::string& msg); virtual void connectionException(framing::connection::CloseCode code, const std::string& msg); + virtual void channelException(framing::session::DetachCode, const std::string& msg); + virtual void executionException(framing::execution::ErrorCode, const std::string& msg); virtual void detaching(); virtual void readyToSend(); diff --git a/qpid/cpp/src/qpid/client/Completion.cpp b/qpid/cpp/src/qpid/client/Completion.cpp new file mode 100644 index 0000000000..e3676b2bde --- /dev/null +++ b/qpid/cpp/src/qpid/client/Completion.cpp @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Completion.h" +#include "CompletionImpl.h" +#include "HandlePrivate.h" + +namespace qpid { +namespace client { + +Completion::Completion(CompletionImpl* i) : Handle<CompletionImpl>(i) {} +Completion::~Completion() {} +Completion::Completion(const Completion& c) : Handle<CompletionImpl>(c.impl) {} +Completion& Completion::operator=(const Completion& c) { Handle<CompletionImpl>::operator=(c); return *this; } + +void Completion::wait() { impl->wait(); } +bool Completion::isComplete() { return impl->isComplete(); } +std::string Completion::getResult() { return impl->getResult(); } + +}} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Completion.h b/qpid/cpp/src/qpid/client/Completion.h index c4979d7934..0b246b7765 100644 --- a/qpid/cpp/src/qpid/client/Completion.h +++ b/qpid/cpp/src/qpid/client/Completion.h @@ -22,13 +22,14 @@ #ifndef _Completion_ #define _Completion_ -#include <boost/shared_ptr.hpp> -#include "Future.h" -#include "SessionImpl.h" +#include "Handle.h" +#include <string> namespace qpid { namespace client { +class CompletionImpl; + /** * Asynchronous commands that do not return a result will return a * Completion. You can use the completion to wait for that specific @@ -38,32 +39,26 @@ namespace client { * *\ingroup clientapi */ -class Completion +class Completion : public Handle<CompletionImpl> { -protected: - Future future; - shared_ptr<SessionImpl> session; - public: ///@internal - Completion() {} - - ///@internal - Completion(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {} + QPID_CLIENT_EXTERN Completion(CompletionImpl* =0); + QPID_CLIENT_EXTERN ~Completion(); + QPID_CLIENT_EXTERN Completion(const Completion&); + QPID_CLIENT_EXTERN Completion& operator=(const Completion&); /** Wait for the asynchronous command that returned this *Completion to complete. * - *@exception If the command returns an error, get() throws an exception. + *@exception If the command returns an error. */ - void wait() - { - future.wait(*session); - } + QPID_CLIENT_EXTERN void wait(); + + QPID_CLIENT_EXTERN bool isComplete(); - bool isComplete() { - return future.isComplete(*session); - } + protected: + QPID_CLIENT_EXTERN std::string getResult(); }; }} diff --git a/qpid/cpp/src/qpid/client/CompletionImpl.h b/qpid/cpp/src/qpid/client/CompletionImpl.h new file mode 100644 index 0000000000..119abc093a --- /dev/null +++ b/qpid/cpp/src/qpid/client/CompletionImpl.h @@ -0,0 +1,50 @@ +#ifndef QPID_CLIENT_COMPLETIONIMPL_H +#define QPID_CLIENT_COMPLETIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "Future.h" + +namespace qpid { +namespace client { + +///@internal +class CompletionImpl : public RefCounted +{ +public: + CompletionImpl() {} + CompletionImpl(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {} + + bool isComplete() { return future.isComplete(*session); } + void wait() { future.wait(*session); } + std::string getResult() { return future.getResult(*session); } + +protected: + Future future; + shared_ptr<SessionImpl> session; +}; + +}} // namespace qpid::client + + +#endif /*!QPID_CLIENT_COMPLETIONIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index cc62d724cb..e8415403ca 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "Connection.h" +#include "ConnectionImpl.h" #include "ConnectionSettings.h" #include "Message.h" #include "SessionImpl.h" diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index 846ac33790..d898ea70d9 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -25,6 +25,7 @@ #include <string> #include "qpid/client/Session.h" #include "qpid/client/ClientImportExport.h" +#include "qpid/client/ConnectionSettings.h" namespace qpid { @@ -32,7 +33,6 @@ struct Url; namespace client { -struct ConnectionSettings; class ConnectionImpl; /** diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 745bdb63b5..b1e83025ab 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -111,9 +111,11 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) Mutex::ScopedLock l(lock); s = sessions[frame.getChannel()].lock(); } - if (!s) - throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel())); - s->in(frame); + if (!s) { + QPID_LOG(info, "Dropping frame received on invalid channel: " << frame); + } else { + s->in(frame); + } } bool ConnectionImpl::isOpen() const diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp index 8d8574520a..5156031748 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp @@ -26,6 +26,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/BlockingQueue.h" #include "Message.h" +#include "MessageImpl.h" #include <boost/state_saver.hpp> @@ -49,6 +50,9 @@ Dispatcher::Dispatcher(const Session& s, const std::string& q) session.getExecution().getDemux().get(q); } +Dispatcher::~Dispatcher() {} + + void Dispatcher::start() { worker = Thread(this); @@ -71,7 +75,7 @@ void Dispatcher::run() Mutex::ScopedUnlock u(lock); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message msg(*content); + Message msg(new MessageImpl(*content)); boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination()); if (!listener) { QPID_LOG(error, "No listener found for destination " << msg.getDestination()); diff --git a/qpid/cpp/src/qpid/client/Dispatcher.h b/qpid/cpp/src/qpid/client/Dispatcher.h index e84f8f303d..4dbb75dcf2 100644 --- a/qpid/cpp/src/qpid/client/Dispatcher.h +++ b/qpid/cpp/src/qpid/client/Dispatcher.h @@ -30,7 +30,6 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "MessageListener.h" -#include "SubscriptionImpl.h" namespace qpid { namespace client { @@ -61,6 +60,7 @@ class Dispatcher : public sys::Runnable public: Dispatcher(const Session& session, const std::string& queue = ""); + ~Dispatcher(); void start(); void wait(); diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp index 16370f8912..ed9354d528 100644 --- a/qpid/cpp/src/qpid/client/FailoverListener.cpp +++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp @@ -20,6 +20,9 @@ */ #include "FailoverListener.h" #include "SessionBase_0_10Access.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/SubscriptionImpl.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" diff --git a/qpid/cpp/src/qpid/client/FailoverListener.h b/qpid/cpp/src/qpid/client/FailoverListener.h index fe73a26611..7afee736ac 100644 --- a/qpid/cpp/src/qpid/client/FailoverListener.h +++ b/qpid/cpp/src/qpid/client/FailoverListener.h @@ -33,6 +33,7 @@ namespace qpid { namespace client { class SubscriptionManager; +class ConnectionImpl; /** * @internal Listen for failover updates from the amq.failover exchange. diff --git a/qpid/cpp/src/qpid/client/Future.cpp b/qpid/cpp/src/qpid/client/Future.cpp index 6a0c78ae4b..fda40219ba 100644 --- a/qpid/cpp/src/qpid/client/Future.cpp +++ b/qpid/cpp/src/qpid/client/Future.cpp @@ -20,6 +20,7 @@ */ #include "Future.h" +#include "SessionImpl.h" namespace qpid { namespace client { diff --git a/qpid/cpp/src/qpid/client/Future.h b/qpid/cpp/src/qpid/client/Future.h index ea01522fe8..28c9a2bbbd 100644 --- a/qpid/cpp/src/qpid/client/Future.h +++ b/qpid/cpp/src/qpid/client/Future.h @@ -26,17 +26,15 @@ #include <boost/shared_ptr.hpp> #include "qpid/Exception.h" #include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/StructHelper.h" #include "FutureCompletion.h" #include "FutureResult.h" -#include "SessionImpl.h" #include "ClientImportExport.h" namespace qpid { namespace client { /**@internal */ -class Future : private framing::StructHelper +class Future { framing::SequenceNumber command; boost::shared_ptr<FutureResult> result; @@ -46,13 +44,9 @@ public: Future() : complete(false) {} Future(const framing::SequenceNumber& id) : command(id), complete(false) {} - template <class T> void decodeResult(T& value, SessionImpl& session) - { - if (result) { - decode(value, result->getResult(session)); - } else { - throw Exception("Result not expected"); - } + std::string getResult(SessionImpl& session) { + if (result) return result->getResult(session); + else throw Exception("Result not expected"); } QPID_CLIENT_EXTERN void wait(SessionImpl& session); diff --git a/qpid/cpp/src/qpid/client/Handle.h b/qpid/cpp/src/qpid/client/Handle.h index d8b822d0f9..12fb4cf3c1 100644 --- a/qpid/cpp/src/qpid/client/Handle.h +++ b/qpid/cpp/src/qpid/client/Handle.h @@ -30,9 +30,11 @@ namespace client { template <class T> class HandlePrivate; /** - * A handle is like a pointer: it points to some underlying object. + * A handle is like a pointer: it points to some implementation object. + * Copying the handle does not copy the object. + * * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the - * implicit conversion to bool to test for a null handle. + * conversion to bool to test for a null handle. */ template <class T> class Handle { public: @@ -46,8 +48,11 @@ template <class T> class Handle { /**@return true if handle is null. It is an error to call any function on a null handle. */ QPID_CLIENT_EXTERN bool isNull() const { return !impl; } + /** Conversion to bool supports idiom if (handle) { handle->... } */ QPID_CLIENT_EXTERN operator bool() const { return impl; } - QPID_CLIENT_EXTERN bool operator !() const { return impl; } + + /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */ + QPID_CLIENT_EXTERN bool operator !() const { return !impl; } QPID_CLIENT_EXTERN void swap(Handle<T>&); diff --git a/qpid/cpp/src/qpid/client/HandlePrivate.h b/qpid/cpp/src/qpid/client/HandlePrivate.h index 488ce48075..46e4bff808 100644 --- a/qpid/cpp/src/qpid/client/HandlePrivate.h +++ b/qpid/cpp/src/qpid/client/HandlePrivate.h @@ -21,14 +21,16 @@ * under the License. * */ +#include "Handle.h" +#include "qpid/RefCounted.h" #include <algorithm> +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace client { /** @file - * Private implementation of handle, include in .cpp file of handle - * subclasses _after_ including the declaration of class T. + * Implementation of handle, include in .cpp file of handle subclasses. * T can be any class that can be used with boost::intrusive_ptr. */ @@ -52,9 +54,13 @@ void Handle<T>::swap(Handle<T>& h) { std::swap(impl, h.impl); } template <class T> class HandlePrivate { public: - static boost::intrusive_ptr<T> get(Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); } + static boost::intrusive_ptr<T> get(const Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); } + static void set(Handle<T>& h, const boost::intrusive_ptr<T>& p) { Handle<T>(p.get()).swap(h); } }; +template<class T> boost::intrusive_ptr<T> handleGetPtr(Handle<T>& h) { return HandlePrivate<T>::get(h); } +template<class T> boost::intrusive_ptr<const T> handleGetPtr(const Handle<T>& h) { return HandlePrivate<T>::get(h); } +template<class T> void handleSetPtr(Handle<T>& h, const boost::intrusive_ptr<T>& p) { HandlePrivate<T>::set(h, p); } }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/LocalQueue.cpp b/qpid/cpp/src/qpid/client/LocalQueue.cpp index e449c9f795..02fecf804f 100644 --- a/qpid/cpp/src/qpid/client/LocalQueue.cpp +++ b/qpid/cpp/src/qpid/client/LocalQueue.cpp @@ -19,11 +19,14 @@ * */ #include "LocalQueue.h" +#include "MessageImpl.h" #include "qpid/Exception.h" #include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "HandlePrivate.h" #include "SubscriptionImpl.h" +#include "CompletionImpl.h" namespace qpid { namespace client { @@ -49,7 +52,7 @@ bool LocalQueue::get(Message& result, sys::Duration timeout) { bool ok = queue->pop(content, timeout); if (!ok) return false; if (content->isA<MessageTransferBody>()) { - result = Message(*content); + result = Message(new MessageImpl(*content)); boost::intrusive_ptr<SubscriptionImpl> si = HandlePrivate<SubscriptionImpl>::get(subscription); assert(si); if (si) si->received(result); diff --git a/qpid/cpp/src/qpid/client/Message.cpp b/qpid/cpp/src/qpid/client/Message.cpp index 13caaecefd..962ce26305 100644 --- a/qpid/cpp/src/qpid/client/Message.cpp +++ b/qpid/cpp/src/qpid/client/Message.cpp @@ -20,52 +20,37 @@ */ #include "Message.h" +#include "PrivateImplPrivate.h" +#include "MessageImpl.h" namespace qpid { namespace client { -Message::Message(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {} +template class PrivateImpl<MessageImpl>; -std::string Message::getDestination() const -{ - return method.getDestination(); -} +Message::Message(const std::string& data, const std::string& routingKey) : PrivateImpl<MessageImpl>(new MessageImpl(data, routingKey)) {} +Message::Message(MessageImpl* i) : PrivateImpl<MessageImpl>(i) {} +Message::~Message() {} -bool Message::isRedelivered() const -{ - return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); -} +std::string Message::getDestination() const { return impl->getDestination(); } +bool Message::isRedelivered() const { return impl->isRedelivered(); } +void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); } +framing::FieldTable& Message::getHeaders() { return impl->getHeaders(); } +const framing::FieldTable& Message::getHeaders() const { return impl->getHeaders(); } +const framing::SequenceNumber& Message::getId() const { return impl->getId(); } -void Message::setRedelivered(bool redelivered) -{ - getDeliveryProperties().setRedelivered(redelivered); -} +void Message::setData(const std::string& s) { impl->setData(s); } +const std::string& Message::getData() const { return impl->getData(); } +std::string& Message::getData() { return impl->getData(); } -framing::FieldTable& Message::getHeaders() -{ - return getMessageProperties().getApplicationHeaders(); -} +void Message::appendData(const std::string& s) { impl->appendData(s); } -const framing::FieldTable& Message::getHeaders() const -{ - return getMessageProperties().getApplicationHeaders(); -} +bool Message::hasMessageProperties() const { return impl->hasMessageProperties(); } +framing::MessageProperties& Message::getMessageProperties() { return impl->getMessageProperties(); } +const framing::MessageProperties& Message::getMessageProperties() const { return impl->getMessageProperties(); } -const framing::MessageTransferBody& Message::getMethod() const -{ - return method; -} - -const framing::SequenceNumber& Message::getId() const -{ - return id; -} - -/**@internal for incoming messages */ -Message::Message(const framing::FrameSet& frameset) : - method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) -{ - populate(frameset); -} +bool Message::hasDeliveryProperties() const { return impl->hasDeliveryProperties(); } +framing::DeliveryProperties& Message::getDeliveryProperties() { return impl->getDeliveryProperties(); } +const framing::DeliveryProperties& Message::getDeliveryProperties() const { return impl->getDeliveryProperties(); } }} diff --git a/qpid/cpp/src/qpid/client/Message.h b/qpid/cpp/src/qpid/client/Message.h index 235e20f97d..97238db647 100644 --- a/qpid/cpp/src/qpid/client/Message.h +++ b/qpid/cpp/src/qpid/client/Message.h @@ -1,5 +1,5 @@ -#ifndef _client_Message_h -#define _client_Message_h +#ifndef QPID_CLIENT_MESSAGE_H +#define QPID_CLIENT_MESSAGE_H /* * @@ -21,15 +21,24 @@ * under the License. * */ -#include <string> -#include "qpid/client/Session.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/TransferContent.h" + +#include "qpid/client/PrivateImpl.h" #include "qpid/client/ClientImportExport.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/DeliveryProperties.h" +#include <string> namespace qpid { + +namespace framing { +class FieldTable; +class SequenceNumber; // FIXME aconway 2009-04-17: remove with getID? +} + namespace client { +class MessageImpl; + /** * A message sent to or received from the broker. * @@ -104,8 +113,7 @@ namespace client { * * */ - -class Message : public framing::TransferContent +class Message : public PrivateImpl<MessageImpl> { public: /** Create a Message. @@ -115,6 +123,23 @@ public: QPID_CLIENT_EXTERN Message(const std::string& data=std::string(), const std::string& routingKey=std::string()); + QPID_CLIENT_EXTERN ~Message(); + + QPID_CLIENT_EXTERN void setData(const std::string&); + QPID_CLIENT_EXTERN const std::string& getData() const; + QPID_CLIENT_EXTERN std::string& getData(); + + QPID_CLIENT_EXTERN void appendData(const std::string&); + + QPID_CLIENT_EXTERN bool hasMessageProperties() const; + QPID_CLIENT_EXTERN framing::MessageProperties& getMessageProperties(); + QPID_CLIENT_EXTERN const framing::MessageProperties& getMessageProperties() const; + + QPID_CLIENT_EXTERN bool hasDeliveryProperties() const; + QPID_CLIENT_EXTERN framing::DeliveryProperties& getDeliveryProperties(); + QPID_CLIENT_EXTERN const framing::DeliveryProperties& getDeliveryProperties() const; + + /** The destination of messages sent to the broker is the exchange * name. The destination of messages received from the broker is * the delivery tag identifyig the local subscription (often this @@ -133,20 +158,14 @@ public: /** Get a non-modifyable reference to the message headers. */ QPID_CLIENT_EXTERN const framing::FieldTable& getHeaders() const; - ///@internal - QPID_CLIENT_EXTERN const framing::MessageTransferBody& getMethod() const; + // FIXME aconway 2009-04-17: does this need to be in public API? ///@internal QPID_CLIENT_EXTERN const framing::SequenceNumber& getId() const; - /**@internal for incoming messages */ - QPID_CLIENT_EXTERN Message(const framing::FrameSet& frameset); - -private: - //method and id are only set for received messages: - framing::MessageTransferBody method; - framing::SequenceNumber id; + ///@internal + Message(MessageImpl*); }; }} -#endif /*!_client_Message_h*/ +#endif /*!QPID_CLIENT_MESSAGE_H*/ diff --git a/qpid/cpp/src/qpid/client/MessageImpl.cpp b/qpid/cpp/src/qpid/client/MessageImpl.cpp new file mode 100644 index 0000000000..3d06fd1d8d --- /dev/null +++ b/qpid/cpp/src/qpid/client/MessageImpl.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageImpl.h" + +namespace qpid { +namespace client { + +MessageImpl::MessageImpl(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {} + +std::string MessageImpl::getDestination() const +{ + return method.getDestination(); +} + +bool MessageImpl::isRedelivered() const +{ + return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); +} + +void MessageImpl::setRedelivered(bool redelivered) +{ + getDeliveryProperties().setRedelivered(redelivered); +} + +framing::FieldTable& MessageImpl::getHeaders() +{ + return getMessageProperties().getApplicationHeaders(); +} + +const framing::FieldTable& MessageImpl::getHeaders() const +{ + return getMessageProperties().getApplicationHeaders(); +} + +const framing::MessageTransferBody& MessageImpl::getMethod() const +{ + return method; +} + +const framing::SequenceNumber& MessageImpl::getId() const +{ + return id; +} + +/**@internal for incoming messages */ +MessageImpl::MessageImpl(const framing::FrameSet& frameset) : + method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) +{ + populate(frameset); +} + +}} diff --git a/qpid/cpp/src/qpid/client/MessageImpl.h b/qpid/cpp/src/qpid/client/MessageImpl.h new file mode 100644 index 0000000000..c06d9b5afc --- /dev/null +++ b/qpid/cpp/src/qpid/client/MessageImpl.h @@ -0,0 +1,76 @@ +#ifndef QPID_CLIENT_MESSAGEIMPL_H +#define QPID_CLIENT_MESSAGEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/client/Session.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/TransferContent.h" + +namespace qpid { +namespace client { + +class MessageImpl : public framing::TransferContent +{ +public: + /** Create a Message. + *@param data Data for the message body. + *@param routingKey Passed to the exchange that routes the message. + */ + MessageImpl(const std::string& data=std::string(), + const std::string& routingKey=std::string()); + + /** The destination of messages sent to the broker is the exchange + * name. The destination of messages received from the broker is + * the delivery tag identifyig the local subscription (often this + * is the name of the subscribed queue.) + */ + std::string getDestination() const; + + /** Check the redelivered flag. */ + bool isRedelivered() const; + /** Set the redelivered flag. */ + void setRedelivered(bool redelivered); + + /** Get a modifyable reference to the message headers. */ + framing::FieldTable& getHeaders(); + + /** Get a non-modifyable reference to the message headers. */ + const framing::FieldTable& getHeaders() const; + + ///@internal + const framing::MessageTransferBody& getMethod() const; + ///@internal + const framing::SequenceNumber& getId() const; + + /**@internal for incoming messages */ + MessageImpl(const framing::FrameSet& frameset); + +private: + //method and id are only set for received messages: + framing::MessageTransferBody method; + framing::SequenceNumber id; +}; + +}} + +#endif /*!QPID_CLIENT_MESSAGEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/PrivateImpl.h b/qpid/cpp/src/qpid/client/PrivateImpl.h new file mode 100644 index 0000000000..6e5ea35ce0 --- /dev/null +++ b/qpid/cpp/src/qpid/client/PrivateImpl.h @@ -0,0 +1,54 @@ +#ifndef QPID_CLIENT_PRIVATEIMPL_H +#define QPID_CLIENT_PRIVATEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/ClientImportExport.h" + +namespace qpid { +namespace client { + +template <class T> class PrivateImplPrivate; + +/** + * Base classes for objects with a private implementation. + * + * PrivateImpl objects have value semantics: copying the object also + * makes a copy of the implementation. + */ +template <class T> class PrivateImpl { + public: + QPID_CLIENT_EXTERN ~PrivateImpl(); + QPID_CLIENT_EXTERN PrivateImpl(const PrivateImpl&); + QPID_CLIENT_EXTERN PrivateImpl& operator=(const PrivateImpl&); + QPID_CLIENT_EXTERN void swap(PrivateImpl<T>&); + + protected: + QPID_CLIENT_EXTERN PrivateImpl(T*); + T* impl; + + friend class PrivateImplPrivate<T>; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_PRIVATEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/client/PrivateImplPrivate.h b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h new file mode 100644 index 0000000000..021456e085 --- /dev/null +++ b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h @@ -0,0 +1,66 @@ +#ifndef QPID_CLIENT_PRIVATEIMPLPRIVATE_H +#define QPID_CLIENT_PRIVATEIMPLPRIVATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <algorithm> + +namespace qpid { +namespace client { + +/** @file + * Implementation of PrivateImpl functions, to include in .cpp file of handle subclasses. + * T can be any class with value semantics. + */ + +template <class T> +PrivateImpl<T>::PrivateImpl(T* p) : impl(p) { assert(impl); } + +template <class T> +PrivateImpl<T>::~PrivateImpl() { delete impl; } + +template <class T> +PrivateImpl<T>::PrivateImpl(const PrivateImpl& h) : impl(new T(*h.impl)) {} + +template <class T> +PrivateImpl<T>& PrivateImpl<T>::operator=(const PrivateImpl<T>& h) { PrivateImpl<T>(h).swap(*this); return *this; } + +template <class T> +void PrivateImpl<T>::swap(PrivateImpl<T>& h) { std::swap(impl, h.impl); } + + +/** Access to private impl of a PrivateImpl */ +template <class T> +class PrivateImplPrivate { + public: + static T* get(const PrivateImpl<T>& h) { return h.impl; } + static void set(PrivateImpl<T>& h, const T& p) { PrivateImpl<T>(p).swap(h); } +}; + +template<class T> T* privateImplGetPtr(PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); } +template<class T> T* privateImplGetPtr(const PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); } +template<class T> void privateImplSetPtr(PrivateImpl<T>& h, const T*& p) { PrivateImplPrivate<T>::set(h, p); } + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_PRIVATEIMPLPRIVATE_H*/ + diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp index e81b78ecd3..8a33c7393f 100644 --- a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -20,6 +20,8 @@ */ #include "SessionBase_0_10.h" #include "Connection.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/Future.h" #include "qpid/framing/all_method_bodies.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.h b/qpid/cpp/src/qpid/client/SessionBase_0_10.h index 3ae21936f6..d375b3ec2e 100644 --- a/qpid/cpp/src/qpid/client/SessionBase_0_10.h +++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.h @@ -23,14 +23,11 @@ */ #include "qpid/SessionId.h" +#include "qpid/client/SessionImpl.h" #include "qpid/framing/amqp_structs.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/TransferContent.h" +#include "qpid/client/Message.h" #include "qpid/client/Completion.h" -#include "qpid/client/ConnectionImpl.h" #include "qpid/client/Execution.h" -#include "qpid/client/SessionImpl.h" #include "qpid/client/TypedResult.h" #include "qpid/shared_ptr.h" #include "qpid/client/ClientImportExport.h" @@ -44,7 +41,6 @@ class Connection; using std::string; using framing::Content; using framing::FieldTable; -using framing::MethodContent; using framing::SequenceNumber; using framing::SequenceSet; using framing::SequenceNumberSet; @@ -62,8 +58,6 @@ enum CreditUnit { MESSAGE_CREDIT=0, BYTE_CREDIT=1, UNLIMITED_CREDIT=0xFFFFFFFF } */ class SessionBase_0_10 { public: - - typedef framing::TransferContent DefaultContent; ///@internal QPID_CLIENT_EXTERN SessionBase_0_10(); diff --git a/qpid/cpp/src/qpid/client/Subscription.cpp b/qpid/cpp/src/qpid/client/Subscription.cpp index 1f1b5ac6c6..37f5557d51 100644 --- a/qpid/cpp/src/qpid/client/Subscription.cpp +++ b/qpid/cpp/src/qpid/client/Subscription.cpp @@ -21,6 +21,7 @@ #include "Subscription.h" #include "SubscriptionImpl.h" +#include "CompletionImpl.h" #include "HandlePrivate.h" #include "qpid/framing/enum.h" diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp index e09a4c142e..82c920cf47 100644 --- a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -20,8 +20,12 @@ */ #include "SubscriptionImpl.h" +#include "MessageImpl.h" +#include "CompletionImpl.h" #include "SubscriptionManager.h" #include "SubscriptionSettings.h" +#include "HandlePrivate.h" +#include "PrivateImplPrivate.h" namespace qpid { namespace client { @@ -114,9 +118,9 @@ void SubscriptionImpl::cancel() { manager.cancel(name); } void SubscriptionImpl::received(Message& m) { Mutex::ScopedLock l(lock); - if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) + if (privateImplGetPtr(m)->getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) unacquired.add(m.getId()); - else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT) + else if (privateImplGetPtr(m)->getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT) unaccepted.add(m.getId()); if (listener) { diff --git a/qpid/cpp/src/qpid/client/TypedResult.h b/qpid/cpp/src/qpid/client/TypedResult.h index 5306997d74..2e54f9fdfc 100644 --- a/qpid/cpp/src/qpid/client/TypedResult.h +++ b/qpid/cpp/src/qpid/client/TypedResult.h @@ -23,6 +23,7 @@ #define _TypedResult_ #include "Completion.h" +#include "qpid/framing/StructHelper.h" namespace qpid { namespace client { @@ -39,7 +40,7 @@ template <class T> class TypedResult : public Completion public: ///@internal - TypedResult(Future f, shared_ptr<SessionImpl> s) : Completion(f, s), decoded(false) {} + TypedResult(CompletionImpl* c) : Completion(c), decoded(false) {} /** * Wait for the asynchronous command that returned this TypedResult to complete @@ -49,13 +50,12 @@ public: *@exception If the command returns an error, get() throws an exception. * */ - T& get() - { + T& get() { if (!decoded) { - future.decodeResult(result, *session); + framing::StructHelper helper; + helper.decode(result, getResult()); decoded = true; } - return result; } }; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index f8e412f1e6..a17f54078c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -36,6 +36,7 @@ #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" +#include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" @@ -46,7 +47,6 @@ #include "qpid/management/ManagementBroker.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qpid/sys/LatencyMetric.h" #include "qpid/sys/Thread.h" #include <boost/bind.hpp> @@ -63,6 +63,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; using namespace qpid::cluster; +using namespace qpid::framing::cluster; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -77,9 +78,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } + void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } + void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -112,7 +114,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : discarding(true), state(INIT), lastSize(0), - lastBroker(false) + lastBroker(false), + error(*this) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -195,14 +198,19 @@ void Cluster::leave() { leave(l); } +#define LEAVE_TRY(STMT) try { STMT; } \ + catch (const std::exception& e) { \ + QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ + } do {} while(0) + void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - try { broker.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); - } + // Finalize connections now now to avoid problems later in destructor. + LEAVE_TRY(localConnections.clear()); + LEAVE_TRY(connections.clear()); + LEAVE_TRY(broker.shutdown()); } } @@ -218,8 +226,6 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - if (from == self) // Record self-deliveries for flow control. - mcast.selfDeliver(e); deliverEvent(e); } @@ -254,10 +260,22 @@ void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DROP: " << e); } +void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { + Mutex::ScopedLock l(lock); + error.error(connection, type, map.getFrameSeq(), map.getMembers()); +} + // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { Mutex::ScopedLock l(lock); + // Process each frame through the error checker. + error.delivered(e); + while (error.canProcess()) // There is a frame ready to process. + processFrame(error.getNext(), l); +} + +void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); @@ -265,7 +283,8 @@ void Cluster::deliveredFrame(const EventFrame& e) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { - QPID_LOG(trace, *this << " DLVR: " << e); + map.incrementFrameSeq(); + QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); ConnectionPtr connection = getConnection(e.connectionId, l); if (connection) connection->deliveredFrame(e); @@ -316,11 +335,11 @@ ostream& operator<<(ostream& o, const AddrList& a) { for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " (joined) "; break; - case CPG_REASON_LEAVE: reasonString = " (left) "; break; - case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break; - case CPG_REASON_NODEUP: reasonString = " (node-up) "; break; - case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break; + case CPG_REASON_JOIN: reasonString = "(joined) "; break; + case CPG_REASON_LEAVE: reasonString = "(left) "; break; + case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break; + case CPG_REASON_NODEUP: reasonString = "(node-up) "; break; + case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); @@ -342,8 +361,8 @@ void Cluster::configChange ( broker.setRecovery(nCurrent == 1); initialized = true; } - QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) - << AddrList(left, nLeft, "( ", ")")); + QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) + << AddrList(left, nLeft, "left: ")); std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); @@ -357,8 +376,8 @@ void Cluster::setReady(Lock&) { broker.getQueueEvents().enable(); } -void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { - bool memberChange = map.configChange(addresses); +void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { + bool memberChange = map.configChange(current); if (state == LEFT) return; if (!map.isAlive(self)) { // Final config change. @@ -589,19 +608,24 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_memberIDs(idstr); } - // Erase connections belonging to members that have left the cluster. + // Close connections belonging to members that have left the cluster. ConnectionMap::iterator i = connections.begin(); while (i != connections.end()) { ConnectionMap::iterator j = i++; MemberId m = j->second->getId().getMember(); if (m != self && !map.isMember(m)) - connections.erase(j); + j->second->deliverClose(); } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.self << "(" << STATE[cluster.state] << ")"; + static const char* STATE[] = { + "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + }; + assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); + o << cluster.self << "(" << STATE[cluster.state]; + if (cluster.error.isUnresolved()) o << "/error"; + return o << ")"; } MemberId Cluster::getId() const { @@ -635,4 +659,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } +void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { + // If we receive an errorCheck here, it's because we have processed past the point + // of the error so respond with ERROR_TYPE_NONE + assert(map.getFrameSeq() >= frameSeq); + if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE. + mcast.mcastControl( + ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index b716e2d781..8a94fc79dd 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -23,6 +23,7 @@ #include "ClusterSettings.h" #include "Cpg.h" #include "Decoder.h" +#include "ErrorCheck.h" #include "Event.h" #include "EventFrame.h" #include "ExpiryPolicy.h" @@ -105,6 +106,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void deliverFrame(const EventFrame&); + // Called in deliverFrame thread to indicate an error from the broker. + void flagError(Connection&, ErrorCheck::ErrorType); + void connectionError(); + // Called only during update by Connection::shadowReady Decoder& getDecoder() { return decoder; } @@ -132,13 +137,15 @@ class Cluster : private Cpg::Handler, public management::Manageable { // == Called in deliverFrameQueue thread void deliveredFrame(const EventFrame&); + void processFrame(const EventFrame&, Lock&); // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); - void configChange(const MemberId&, const std::string& addresses, Lock& l); + void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); + void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&); void shutdown(const MemberId&, Lock&); // Helper functions @@ -216,11 +223,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { Decoder decoder; bool discarding; + // Remaining members are protected by lock. - // FIXME aconway 2009-03-06: Most of these members are also only used in + + // TODO aconway 2009-03-06: Most of these members are also only used in // deliverFrameQueue thread or during stall. Review and separate members // that require a lock, drop lock when not needed. - // + mutable sys::Monitor lock; @@ -243,7 +252,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; - + ErrorCheck error; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index 9e7232180d..0395ff6382 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -33,6 +33,13 @@ using namespace framing; namespace cluster { +ClusterMap::Set ClusterMap::decode(const std::string& s) { + Set set; + for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) + set.insert(MemberId(std::string(i, i+8))); + return set; +} + namespace { void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) { @@ -54,9 +61,9 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { } -ClusterMap::ClusterMap() {} +ClusterMap::ClusterMap() : frameSeq(0) {} -ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { +ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) { alive.insert(id); if (isMember) members[id] = url; @@ -64,7 +71,9 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { joiners[id] = url; } -ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) { +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_) + : frameSeq(frameSeq_) +{ std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive))); std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); } @@ -78,22 +87,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const } b.getMembers().clear(); std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); -} - -bool ClusterMap::configChange( - cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address */*joined*/, int /*nJoined*/) -{ - cpg_address* a; - bool memberChange=false; - for (a = left; a != left+nLeft; ++a) { - memberChange = memberChange || members.erase(*a); - joiners.erase(*a); - } - alive.clear(); - std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); - return memberChange; + b.setFrameSeq(frameSeq); } Url ClusterMap::getUrl(const Map& map, const MemberId& id) { @@ -123,8 +117,13 @@ std::vector<Url> ClusterMap::memberUrls() const { return urls; } -ClusterMap::Set ClusterMap::getAlive() const { - return alive; +ClusterMap::Set ClusterMap::getAlive() const { return alive; } + +ClusterMap::Set ClusterMap::getMembers() const { + Set s; + std::transform(members.begin(), members.end(), std::inserter(s, s.begin()), + boost::bind(&Map::value_type::first, _1)); + return s; } std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { @@ -158,7 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { bool ClusterMap::configChange(const std::string& addresses) { bool memberChange = false; - Set update; + Set update = decode(addresses); for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8) update.insert(MemberId(std::string(i, i+8))); Set removed; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 4548441442..3359c7c1f3 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -38,26 +38,26 @@ namespace qpid { namespace cluster { +typedef std::set<MemberId> MemberSet; + /** - * Map of established cluster members and joiners waiting for an update. + * Map of established cluster members and joiners waiting for an update, + * along with other cluster state that must be updated. */ class ClusterMap { public: typedef std::map<MemberId, Url> Map; typedef std::set<MemberId> Set; + static Set decode(const std::string&); + ClusterMap(); ClusterMap(const MemberId& id, const Url& url, bool isReady); - ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states); + ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq); /** Update from config change. *@return true if member set changed. */ - bool configChange( - cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address *joined, int nJoined); - bool configChange(const std::string& addresses); bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); } @@ -78,6 +78,7 @@ class ClusterMap { std::vector<std::string> memberIds() const; std::vector<Url> memberUrls() const; Set getAlive() const; + Set getMembers() const; bool updateRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ @@ -90,11 +91,16 @@ class ClusterMap { * Utility method to return intersection of two member sets */ static Set intersection(const Set& a, const Set& b); + + uint64_t getFrameSeq() { return frameSeq; } + uint64_t incrementFrameSeq() { return ++frameSeq; } + private: Url getUrl(const Map& map, const MemberId& id); Map joiners, members; Set alive; + uint64_t frameSeq; friend std::ostream& operator<<(std::ostream&, const Map&); friend std::ostream& operator<<(std::ostream&, const ClusterMap&); diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index aa7d082720..97cafbabaa 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -39,8 +39,6 @@ #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" -#include "qpid/sys/LatencyMetric.h" -#include "qpid/sys/AtomicValue.h" #include <boost/current_function.hpp> @@ -56,8 +54,16 @@ namespace qpid { namespace cluster { using namespace framing; +using namespace framing::cluster; + +qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); + +Connection::NullFrameHandler Connection::nullFrameHandler; + +struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} +}; -NoOpConnectionOutputHandler Connection::discardHandler; namespace { sys::AtomicValue<uint64_t> idCounter; @@ -89,6 +95,8 @@ void Connection::init() { connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames connection.setClientThrottling(false); // Disable client throttling, done by active node. } + if (!isCatchUp()) + connection.setErrorListener(this); } void Connection::giveReadCredit(int credit) { @@ -97,6 +105,7 @@ void Connection::giveReadCredit(int credit) { } Connection::~Connection() { + connection.setErrorListener(0); QPID_LOG(debug, cluster << " deleted connection: " << *this); } @@ -126,7 +135,7 @@ void Connection::received(framing::AMQFrame& f) { cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); - output.closeOutput(discardHandler); + output.closeOutput(); catchUp = false; } else @@ -156,8 +165,8 @@ void Connection::deliveredFrame(const EventFrame& f) { { if (f.type == DATA) // incoming data frames to broker::Connection connection.received(const_cast<AMQFrame&>(f.frame)); - else { // frame control, send frame via SessionState - broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + else { // frame control, send frame via SessionState + broker::SessionState* ss = connection.getChannel(currentChannel).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } } @@ -180,7 +189,7 @@ void Connection::closed() { // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. - output.closeOutput(discardHandler); + output.closeOutput(); cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); } } @@ -275,13 +284,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; connection.setUserId(username); - // OK to use decoder here because we are stalled for update. + // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); + connection.setErrorListener(this); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); self.second = 0; // Mark this as completed update connection. } @@ -305,7 +315,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { } broker::QueuedMessage Connection::getUpdateMessage() { - broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get(); + shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); + assert(!updateq->isDurable()); + broker::QueuedMessage m = updateq->get(); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -342,15 +354,15 @@ void Connection::deliveryRecord(const string& qname, // If the message was unacked, the newbie broker must place // it in its messageStore. - if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled ) + if ( m.payload && m.payload->isPersistent() && acquired && !ended) queue->enqueue ( 0, m.payload ); } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); - if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); - q->setPosition(position); -} + shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); + if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); + q->setPosition(position); + } void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); @@ -407,7 +419,14 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } -qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); +void Connection::sessionError(uint16_t , const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_SESSION); + +} + +void Connection::connectionError(const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_CONNECTION); +} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 6434f763a8..414e5c935f 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -25,7 +25,6 @@ #include "types.h" #include "WriteEstimate.h" #include "OutputInterceptor.h" -#include "NoOpConnectionOutputHandler.h" #include "EventFrame.h" #include "McastFrameHandler.h" @@ -58,7 +57,8 @@ class Event; class Connection : public RefCounted, public sys::ConnectionInputHandler, - public framing::AMQP_AllOperations::ClusterConnectionHandler + public framing::AMQP_AllOperations::ClusterConnectionHandler, + private broker::Connection::ErrorListener { public: @@ -120,7 +120,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, @@ -151,14 +151,22 @@ class Connection : void giveReadCredit(int credit); + void deliverClose(); + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} }; + + static NullFrameHandler nullFrameHandler; + + // Error listener functions + void connectionError(const std::string&); + void sessionError(uint16_t channel, const std::string&); + void init(); bool checkUnsupported(const framing::AMQBody& body); - void deliverClose(); void deliverDoOutput(uint32_t requested); void sendDoOutput(); @@ -167,8 +175,6 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); - static NoOpConnectionOutputHandler discardHandler; - Cluster& cluster; ConnectionId self; bool catchUp; @@ -181,7 +187,6 @@ class Connection : boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; - NullFrameHandler nullFrameHandler; static qpid::sys::AtomicValue<uint64_t> catchUpId; diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 915a578989..f746eacea4 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -75,11 +75,14 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow ::memset(&callbacks, sizeof(callbacks), 0); callbacks.cpg_deliver_fn = &globalDeliver; callbacks.cpg_confchg_fn = &globalConfigChange; + + QPID_LOG(info, "Initializing CPG"); cpg_error_t err = cpg_initialize(&handle, &callbacks); - if (err == CPG_ERR_TRY_AGAIN) { - QPID_LOG(notice, "Waiting for CPG initialization."); - while (CPG_ERR_TRY_AGAIN == (err = cpg_initialize(&handle, &callbacks))) - sys::sleep(5); + int retries = 6; + while (err == CPG_ERR_TRY_AGAIN && --retries) { + QPID_LOG(notice, "Re-trying CPG initialization."); + sys::sleep(5); + err = cpg_initialize(&handle, &callbacks); } check(err, "Failed to initialize CPG."); check(cpg_context_set(handle, this), "Cannot set CPG context"); @@ -87,7 +90,6 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow // windows then this needs to be refactored into // qpid::sys::<platform> IOHandle::impl->fd = getFd(); - QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle); } Cpg::~Cpg() { diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp new file mode 100644 index 0000000000..6132d52126 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ErrorCheck.h" +#include "EventFrame.h" +#include "ClusterMap.h" +#include "Cluster.h" +#include "qpid/framing/ClusterErrorCheckBody.h" +#include "qpid/framing/ClusterConfigChangeBody.h" +#include "qpid/log/Statement.h" + +#include <algorithm> + +namespace qpid { +namespace cluster { + +using namespace std; +using namespace framing; +using namespace framing::cluster; + +ErrorCheck::ErrorCheck(Cluster& c) + : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) +{} + +ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) { + copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " ")); + return o; +} + +void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms) +{ + // Detected a local error, inform cluster and set error state. + assert(t != ERROR_TYPE_NONE); // Must be an error. + assert(type == ERROR_TYPE_NONE); // Can only be called while processing + type = t; + unresolved = ms; + frameSeq = seq; + connection = &c; + QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection") + << " error " << frameSeq << " unresolved: " << unresolved); + mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId()); +} + +void ErrorCheck::delivered(const EventFrame& e) { + if (isUnresolved()) { + const ClusterErrorCheckBody* errorCheck = + dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod()); + const ClusterConfigChangeBody* configChange = + dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod()); + + if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error + if (errorCheck->getType() < type) { // my error is worse than his + QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId()); + throw Exception("Aborted by local failure that did not occur on all replicas"); + } + else { // his error is worse/same as mine. + QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId()); + unresolved.erase(e.getMemberId()); + checkResolved(); + } + } + else { + frames.push_back(e); // Only drop matching errorCheck controls. + if (configChange) { + MemberSet members(ClusterMap::decode(configChange->getCurrent())); + MemberSet result; + set_intersection(members.begin(), members.end(), + unresolved.begin(), unresolved.end(), + inserter(result, result.begin())); + unresolved.swap(result); + checkResolved(); + } + } + } + else + frames.push_back(e); +} + +void ErrorCheck::checkResolved() { + if (unresolved.empty()) { // No more potentially conflicted members, we're clear. + type = ERROR_TYPE_NONE; + QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved."); + } + else + QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved); +} + +EventFrame ErrorCheck::getNext() { + assert(canProcess()); + EventFrame e(frames.front()); + frames.pop_front(); + return e; +} + +bool ErrorCheck::canProcess() const { + return type == ERROR_TYPE_NONE && !frames.empty(); +} + +bool ErrorCheck::isUnresolved() const { + return type != ERROR_TYPE_NONE; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h new file mode 100644 index 0000000000..97b5f2bffd --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h @@ -0,0 +1,80 @@ +#ifndef QPID_CLUSTER_ERRORCHECK_H +#define QPID_CLUSTER_ERRORCHECK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "Multicaster.h" +#include "qpid/framing/enum.h" +#include <boost/function.hpp> +#include <deque> +#include <set> + +namespace qpid { +namespace cluster { + +class EventFrame; +class ClusterMap; +class Cluster; +class Multicaster; +class Connection; + +/** + * Error checking logic. + * + * When an error occurs stop processing frames and queue them until we + * can determine if all nodes experienced the error. If not, we shut down. + */ +class ErrorCheck +{ + public: + typedef std::set<MemberId> MemberSet; + typedef framing::cluster::ErrorType ErrorType; + + ErrorCheck(Cluster&); + + /** A local error has occured */ + void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&); + + /** Called when a frame is delivered */ + void delivered(const EventFrame&); + + EventFrame getNext(); + + bool canProcess() const; + bool isUnresolved() const; + + private: + void checkResolved(); + + Cluster& cluster; + Multicaster& mcast; + std::deque<EventFrame> frames; + std::set<MemberId> unresolved; + uint64_t frameSeq; + ErrorType type; + Connection* connection; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_ERRORCHECK_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h index e05ad60bcf..f0e445a08c 100644 --- a/qpid/cpp/src/qpid/cluster/Event.h +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -25,7 +25,6 @@ #include "types.h" #include "qpid/RefCountedBuffer.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/sys/LatencyMetric.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -42,7 +41,7 @@ class Buffer; namespace cluster { /** Header data for a multicast event */ -class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { +class EventHeader { public: EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); void decode(const MemberId& m, framing::Buffer&); diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp index 9350c801f5..a48d134f1b 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp +++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp @@ -28,9 +28,7 @@ EventFrame::EventFrame() {} EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType()) -{ - QPID_LATENCY_INIT(frame); -} +{} std::ostream& operator<<(std::ostream& o, const EventFrame& e) { if (e.frame.getBody()) o << e.frame; diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h index d6ff58dd38..e275aac7aa 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.h +++ b/qpid/cpp/src/qpid/cluster/EventFrame.h @@ -25,7 +25,6 @@ #include "types.h" #include "Event.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/sys/LatencyMetric.h" #include <boost/intrusive_ptr.hpp> #include <iosfwd> @@ -45,6 +44,7 @@ struct EventFrame bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } bool isLastInEvent() const { return readCredit; } + MemberId getMemberId() const { return connectionId.getMember(); } ConnectionId connectionId; diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp index 409180c499..348963f901 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -50,6 +50,13 @@ void ExpiryPolicy::willExpire(broker::Message& m) { timer.add(new ExpiryTask(this, id, m.getExpiration())); } +void ExpiryPolicy::forget(broker::Message& m) { + MessageIdMap::iterator i = unexpiredByMessage.find(&m); + assert(i != unexpiredByMessage.end()); + unexpiredById.erase(i->second); + unexpiredByMessage.erase(i); +} + bool ExpiryPolicy::hasExpired(broker::Message& m) { return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); } diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h index 9f8b1a9236..c147e54796 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -49,8 +49,8 @@ class ExpiryPolicy : public broker::ExpiryPolicy ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&); void willExpire(broker::Message&); - bool hasExpired(broker::Message&); + void forget(broker::Message&); // Send expiration notice to cluster. void sendExpire(uint64_t); diff --git a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h index 8b2f6dae8e..4df742d6c2 100644 --- a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h +++ b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h @@ -52,6 +52,8 @@ class LockedConnectionMap return 0; } + void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); } + private: typedef std::map<ConnectionId, ConnectionPtr> Map; mutable sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index f0738ab08f..3b9d3ac990 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -22,7 +22,6 @@ #include "Multicaster.h" #include "Cpg.h" #include "qpid/log/Statement.h" -#include "qpid/sys/LatencyMetric.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" @@ -64,7 +63,6 @@ void Multicaster::mcast(const Event& e) { return; } } - QPID_LATENCY_INIT(e); queue.push(e); } @@ -73,7 +71,6 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { try { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { - QPID_LATENCY_RECORD("mcast send queue", *i); iovec iov = i->toIovec(); if (!cpg.mcast(&iov, 1)) { // cpg didn't send because of CPG flow control. @@ -97,9 +94,4 @@ void Multicaster::release() { holdingQueue.clear(); } -void Multicaster::selfDeliver(const Event& e) { - sys::Mutex::ScopedLock l(lock); - QPID_LATENCY_RECORD("cpg self deliver", e); -} - }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h index d1c3115977..baa5b87f38 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.h +++ b/qpid/cpp/src/qpid/cluster/Multicaster.h @@ -55,8 +55,6 @@ class Multicaster void mcast(const Event& e); /** End holding mode, held events are mcast */ void release(); - /** Call when events are self-delivered to manage flow control. */ - void selfDeliver(const Event&); private: typedef sys::PollableQueue<Event> PollableEventQueue; diff --git a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h index 74a376a657..6a30bddf06 100644 --- a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h +++ b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h @@ -30,8 +30,7 @@ namespace framing { class AMQFrame; } namespace cluster { /** - * Output handler for frames sent to noop connections. - * Simply discards frames. + * Output handler shadow connections, simply discards frames. */ class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler { diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index cd42446016..6af114a662 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -32,8 +32,9 @@ namespace cluster { using namespace framing; -OutputInterceptor::OutputInterceptor( - cluster::Connection& p, sys::ConnectionOutputHandler& h) +NoOpConnectionOutputHandler OutputInterceptor::discardHandler; + +OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) : parent(p), closing(false), next(&h), sent(), writeEstimate(p.getCluster().getWriteEstimate()), moreOutput(), doingOutput() @@ -47,7 +48,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) { } if (!parent.isCatchUp()) sent += f.encodedSize(); - QPID_LATENCY_RECORD("up to write queue", f); } void OutputInterceptor::activateOutput() { @@ -98,7 +98,6 @@ void OutputInterceptor::deliverDoOutput(size_t requested) { // Send a doOutput request if one is not already in flight. void OutputInterceptor::sendDoOutput() { if (!parent.isLocal()) return; - QPID_LATENCY_INIT(*this); doingOutput = true; size_t request = writeEstimate.sending(getBuffered()); @@ -111,10 +110,10 @@ void OutputInterceptor::sendDoOutput() { QPID_LOG(trace, parent << "Send doOutput request for " << request); } -void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) { +void OutputInterceptor::closeOutput() { sys::Mutex::ScopedLock l(lock); closing = true; - next = &h; + next = &discardHandler; } void OutputInterceptor::close() { diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h index c080a419e1..61e246bb89 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h @@ -23,9 +23,9 @@ */ #include "WriteEstimate.h" +#include "NoOpConnectionOutputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/broker/ConnectionFactory.h" -#include "qpid/sys/LatencyMetric.h" #include <boost/function.hpp> namespace qpid { @@ -37,7 +37,7 @@ class Connection; /** * Interceptor for connection OutputHandler, manages outgoing message replication. */ -class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetricTimestamp { +class OutputInterceptor : public sys::ConnectionOutputHandler { public: OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h); @@ -53,7 +53,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri // Intercept doOutput requests on Connection. bool doOutput(); - void closeOutput(sys::ConnectionOutputHandler& h); + void closeOutput(); cluster::Connection& parent; @@ -70,6 +70,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri WriteEstimate writeEstimate; bool moreOutput; bool doingOutput; + static NoOpConnectionOutputHandler discardHandler; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 97eae7efa3..bb4df8890a 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -26,6 +26,9 @@ #include "ExpiryPolicy.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Future.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" @@ -98,10 +101,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con expiry(expiry_), connections(cons), decoder(decoder_), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) -{ - connection.open(url, cs); - session = connection.newSession(UPDATE); -} +{} UpdateClient::~UpdateClient() {} @@ -110,6 +110,8 @@ const std::string UpdateClient::UPDATE("qpid.cluster-update"); void UpdateClient::run() { try { + connection.open(updateeUrl, connectionSettings); + session = connection.newSession(UPDATE); update(); done(); } catch (const std::exception& e) { @@ -126,15 +128,19 @@ void UpdateClient::update() { // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - session.close(); std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + session.queueDelete(arg::queue=UPDATE); + session.close(); + + ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); + connection.close(); QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); } @@ -203,7 +209,6 @@ class MessageUpdater { sb.get()->send(transfer, message.payload->getFrames()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 23d061b7e4..96e2479955 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -24,6 +24,7 @@ #include "ClusterMap.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" #include "qpid/client/AsyncSession.h" #include "qpid/broker/SemanticState.h" #include "qpid/sys/Runnable.h" diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index 34319e7ed4..2f0a01d6ca 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -26,7 +26,6 @@ #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" #include "ProtocolVersion.h" -#include "qpid/sys/LatencyMetric.h" #include <boost/intrusive_ptr.hpp> #include <boost/cast.hpp> #include "qpid/CommonImportExport.h" @@ -34,7 +33,7 @@ namespace qpid { namespace framing { -class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp +class AMQFrame : public AMQDataBlock { public: QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0); diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp b/qpid/cpp/src/qpid/sys/LatencyMetric.cpp deleted file mode 100644 index caa221def4..0000000000 --- a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifdef QPID_LATENCY_METRIC - -#include "LatencyMetric.h" -#include "Time.h" -#include <iostream> - -namespace qpid { -namespace sys { - -void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) { - const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now()); -} - -void LatencyMetricTimestamp::clear(const LatencyMetricTimestamp& ts) { - const_cast<int64_t&>(ts.latency_metric_timestamp) = 0; -} - -LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) : - message(msg), count(0), total(0), skipped(0), skip(skip_) -{} - -LatencyMetric::~LatencyMetric() { report(); } - -void LatencyMetric::record(const LatencyMetricTimestamp& start) { - if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps. - if (skip) { - if (++skipped < skip) return; - else skipped = 0; - } - ++count; - int64_t now_ = Duration(now()); - total += now_ - start.latency_metric_timestamp; - // Set start time for next leg of the journey - const_cast<int64_t&>(start.latency_metric_timestamp) = now_; -} - -void LatencyMetric::report() { - using namespace std; - if (count) { - cout << "LATENCY: " << message << ": " - << total / (count * TIME_USEC) << " microseconds" << endl; - } - else { - cout << "LATENCY: " << message << ": no data." << endl; - } - count = 0; - total = 0; -} - - -}} // namespace qpid::sys - -#endif diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.h b/qpid/cpp/src/qpid/sys/LatencyMetric.h deleted file mode 100644 index 63b5020db4..0000000000 --- a/qpid/cpp/src/qpid/sys/LatencyMetric.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef QPID_SYS_LATENCYMETRIC_H -#define QPID_SYS_LATENCYMETRIC_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifdef QPID_LATENCY_METRIC - -#include "qpid/sys/IntegerTypes.h" - -namespace qpid { -namespace sys { - -/** Use this base class to add a timestamp for latency to an object */ -struct LatencyMetricTimestamp { - LatencyMetricTimestamp() : latency_metric_timestamp(0) {} - static void initialize(const LatencyMetricTimestamp&); - static void clear(const LatencyMetricTimestamp&); - int64_t latency_metric_timestamp; -}; - -/** - * Record average latencies, report on destruction. - * - * For debugging only, use via macros below so it can be compiled out - * of production code. - */ -class LatencyMetric { - public: - /** msg should be a string literal. */ - LatencyMetric(const char* msg, int64_t skip_=0); - ~LatencyMetric(); - - void record(const LatencyMetricTimestamp& start); - - private: - void report(); - const char* message; - int64_t count, total, skipped, skip; -}; - -}} // namespace qpid::sys - -#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x) -#define QPID_LATENCY_CLEAR(x) ::qpid::sys::LatencyMetricTimestamp::clear(x) -#define QPID_LATENCY_RECORD(msg, x) do { \ - static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \ - } while (false) -#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) do { \ - static ::qpid::sys::LatencyMetric metric__(msg, skip); metric__.record(x); \ - } while (false) - - -#else /* defined QPID_LATENCY_METRIC */ - -namespace qpid { namespace sys { -class LatencyMetricTimestamp {}; -}} - -#define QPID_LATENCY_INIT(x) (void)x -#define QPID_LATENCY_CLEAR(x) (void)x -#define QPID_LATENCY_RECORD(msg, x) (void)x -#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) (void)x - -#endif /* defined QPID_LATENCY_METRIC */ - -#endif /*!QPID_SYS_LATENCYMETRIC_H*/ diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h index 067c0fe6b7..cd89f39292 100755 --- a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h +++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -23,6 +23,7 @@ */ #include "AsynchIoResult.h" +#include "qpid/CommonImportExport.h" #include <winsock2.h> @@ -49,7 +50,7 @@ public: AsynchIO::RequestCallback cbRequest; }; -SOCKET toFd(const IOHandlePrivate* h); +QPID_COMMON_EXTERN SOCKET toFd(const IOHandlePrivate* h); }} |