diff options
author | Alan Conway <aconway@apache.org> | 2009-01-27 01:44:02 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-01-27 01:44:02 +0000 |
commit | 57acf95c94d52b15b2ad6e6038bf3390d9063282 (patch) | |
tree | e7b07be0e204b1e9f4bda85be345efd9dcabca44 | |
parent | d40d874132bc5011a76bd883fdf9d2507a2f8149 (diff) | |
download | qpid-python-57acf95c94d52b15b2ad6e6038bf3390d9063282.tar.gz |
cluster: Add sequence number to events & frames
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737968 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterLeaveException.h | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterQueueHandler.h | 56 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/EventFrame.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/types.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 1 |
13 files changed, 94 insertions, 81 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index b13ce10b72..31eed2aec6 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -40,7 +40,7 @@ cluster_la_SOURCES = \ $(CMAN_SOURCES) \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ - qpid/cluster/ClusterLeaveException.h \ + qpid/cluster/ClusterQueueHandler.h \ qpid/cluster/ClusterMap.cpp \ qpid/cluster/ClusterMap.h \ qpid/cluster/ClusterPlugin.cpp \ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0e1c049a9c..f8adb8ee98 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -20,6 +20,7 @@ #include "Connection.h" #include "DumpClient.h" #include "FailoverExchange.h" +#include "ClusterQueueHandler.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" @@ -97,11 +98,12 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b writeEstimate(writeEstimate_), mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller), - deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller), + deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller), + deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller), state(INIT), lastSize(0), - lastBroker(false) + lastBroker(false), + sequence(0) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -195,6 +197,7 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); + e.setSequence(sequence++); if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e, l); @@ -208,26 +211,6 @@ void Cluster::deliver(const Event& e, Lock&) { } // Entry point: called when deliverEventQueue has events to process. -void Cluster::deliveredEvents(PollableEventQueue::Queue& events) { - try { - for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1)); - events.clear(); - } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); - leave(); - } -} - -void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) { - try { - for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1)); - frames.clear(); - } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); - leave(); - } -} - void Cluster::deliveredEvent(const Event& e) { QPID_LATENCY_RECORD("delivered event queue", e); Buffer buf(const_cast<char*>(e.getData()), e.getSize()); @@ -243,7 +226,7 @@ void Cluster::deliveredEvent(const Event& e) { if (e.getType() == CONTROL) { AMQFrame frame; while (frame.decode(buf)) { - deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame)); + deliverFrameQueue.push(EventFrame(connection, e, frame)); } } else if (e.getType() == DATA) { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index ecd63a866e..ef63c4c3fe 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -129,8 +129,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); - void deliveredEvents(PollableEventQueue::Queue&); - void deliveredFrames(PollableFrameQueue::Queue&); void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); @@ -215,6 +213,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { ClusterMap map; size_t lastSize; bool lastBroker; + uint64_t sequence; // Dump related sys::Thread dumpThread; diff --git a/cpp/src/qpid/cluster/ClusterLeaveException.h b/cpp/src/qpid/cluster/ClusterLeaveException.h deleted file mode 100644 index e5bdbc560a..0000000000 --- a/cpp/src/qpid/cluster/ClusterLeaveException.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H -#define QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/Exception.h" - -namespace qpid { -namespace cluster { - -struct ClusterLeaveException : public Exception -{ - ClusterLeaveException(const std::string& message=std::string()) : Exception(message) {} -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H*/ diff --git a/cpp/src/qpid/cluster/ClusterQueueHandler.h b/cpp/src/qpid/cluster/ClusterQueueHandler.h new file mode 100644 index 0000000000..e843526962 --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterQueueHandler.h @@ -0,0 +1,56 @@ +#ifndef QPID_CLUSTER_CLUSTERQUEUEHANDLER_H +#define QPID_CLUSTER_CLUSTERQUEUEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Cluster.h" +#include "qpid/sys/PollableQueue.h" +#include <qpid/log/Statement.h> + +namespace qpid { +namespace cluster { + +/** Convenience functor for PollableQueue callbacks. */ +template <class T> struct ClusterQueueHandler { + ClusterQueueHandler(Cluster& c, boost::function<void (const T&)> f, const std::string& n) : cluster(c), callback(f), name(n) {} + ClusterQueueHandler(const Cluster* c, boost::function<void (const T&)> f, const std::string& n) : cluster(*const_cast<Cluster*>(c)), callback(f), name(n) {} + + void operator()(typename sys::PollableQueue<T>::Queue& values) { + try { + std::for_each(values.begin(), values.end(), callback); + values.clear(); + } + catch (const std::exception& e) { + QPID_LOG(error, "Error on " << name << ": " << e.what()); + cluster.leave(); + } + } + + Cluster& cluster; + boost::function<void (const T&)> callback; + std::string name; +}; + + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CLUSTERQUEUEHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index d05baffe3a..839a0e67b9 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -137,7 +137,7 @@ bool Connection::checkUnsupported(const AMQBody& body) { } // Decode buffer and put frames on frameq. -void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) { +void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { assert(!catchUp); Buffer buf(e); // Set read credit on the last frame. @@ -145,10 +145,10 @@ void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) { if (!mcastDecoder.decode(buf)) return; AMQFrame frame(mcastDecoder.frame); while (mcastDecoder.decode(buf)) { - frameq.push(EventFrame(this, getId().getMember(), frame)); + frameq.push(EventFrame(this, e, frame)); frame = mcastDecoder.frame; } - frameq.push(EventFrame(this, getId().getMember(), frame, readCredit)); + frameq.push(EventFrame(this, e, frame, readCredit)); readCredit = 0; } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 29dee5eda4..3b18e22d17 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -26,7 +26,6 @@ #include "WriteEstimate.h" #include "OutputInterceptor.h" #include "NoOpConnectionOutputHandler.h" -#include "Event.h" #include "EventFrame.h" #include "qpid/broker/Connection.h" @@ -62,7 +61,7 @@ class Connection : { public: - typedef sys::PollableQueue<EventFrame> EventFrameQueue; + typedef sys::PollableQueue<EventFrame> PollableFrameQueue; /** Local connection, use this in ConnectionId */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink); @@ -102,7 +101,7 @@ class Connection : size_t decode(const char* buffer, size_t size); // Called for data delivered from the cluster. - void deliveredEvent(const Event&, EventFrameQueue&); + void deliveredEvent(const Event&, PollableFrameQueue&); void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled); diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 59a7241715..339c1de1dd 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -42,7 +42,7 @@ const size_t EventHeader::HEADER_SIZE = ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s) {} + : type(t), connectionId(c), size(s), sequence(0) {} Event::Event() {} @@ -53,10 +53,10 @@ Event::Event(EventType t, const ConnectionId& c, size_t s) void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { if (buf.available() <= HEADER_SIZE) - throw ClusterLeaveException("Not enough for multicast header"); + throw Exception("Not enough for multicast header"); type = (EventType)buf.getOctet(); if(type != DATA && type != CONTROL) - throw ClusterLeaveException("Invalid multicast event type"); + throw Exception("Invalid multicast event type"); connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); size = buf.getLong(); #ifdef QPID_LATENCY_METRIC @@ -68,7 +68,7 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { Event e; e.decode(m, buf); // Header if (buf.available() < e.size) - throw ClusterLeaveException("Not enough data for multicast event"); + throw Exception("Not enough data for multicast event"); e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); return e; diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index a1d21f5e04..5df0c96f77 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -23,10 +23,7 @@ */ #include "types.h" -#include "Cpg.h" -#include "Connection.h" #include "qpid/RefCountedBuffer.h" -#include "qpid/framing/Buffer.h" #include "qpid/sys/LatencyMetric.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -37,6 +34,7 @@ namespace qpid { namespace framing { class AMQBody; +class Buffer; } namespace cluster { @@ -52,6 +50,8 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { ConnectionId getConnectionId() const { return connectionId; } MemberId getMemberId() const { return connectionId.getMember(); } size_t getSize() const { return size; } + uint64_t getSequence() const { return sequence; } + void setSequence(uint64_t n) { sequence = n; } bool isCluster() const { return connectionId.getPointer() == 0; } bool isConnection() const { return connectionId.getPointer() != 0; } @@ -62,6 +62,7 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { EventType type; ConnectionId connectionId; size_t size; + uint64_t sequence; }; /** diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index 28ec6e4dc0..2420a2b1e5 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -23,6 +23,7 @@ */ #include "types.h" +#include "Event.h" #include "qpid/framing/AMQFrame.h" #include "qpid/sys/LatencyMetric.h" #include <boost/intrusive_ptr.hpp> @@ -37,19 +38,30 @@ class Connection; */ struct EventFrame { + EventFrame() : sequence(0) {} // Connection event frame - EventFrame(const boost::intrusive_ptr<Connection>& c, const MemberId& m, const framing::AMQFrame& f, int rc=0) - : connection(c), member(m), frame(f), readCredit(rc) { + EventFrame(const boost::intrusive_ptr<Connection>& c, const Event& e, const framing::AMQFrame& f, int rc=0) + : connection(c), member(e.getMemberId()), frame(f), sequence(e.getSequence()), readCredit(rc) + { QPID_LATENCY_INIT(frame); } bool isCluster() const { return !connection; } bool isConnection() const { return connection; } + bool isLastInEvent() const { return readCredit; } + + // True if this frame follows immediately after frame e. + bool follows(const EventFrame& e) const { + return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit); + } + + bool operator<(const EventFrame& e) const { return sequence < e.sequence; } boost::intrusive_ptr<Connection> connection; MemberId member; framing::AMQFrame frame; - int readCredit; // restore this much read credit when frame is processed + uint64_t sequence; + int readCredit; // last frame in an event, give credit when processed. }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 4fa12651eb..847088435c 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -21,7 +21,6 @@ #include "Multicaster.h" #include "Cpg.h" -#include "ClusterLeaveException.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index d8dc35f167..0797d472b6 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -22,8 +22,6 @@ * */ - -#include "ClusterLeaveException.h" #include "config.h" #include "qpid/Url.h" diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 9fbe48fe84..b5ff98c2c7 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -46,6 +46,7 @@ template <class T> class PollableQueue { public: typedef std::deque<T> Queue; + typedef T value_type; /** * Callback to process a batch of items from the queue. |