summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-01-27 01:44:02 +0000
committerAlan Conway <aconway@apache.org>2009-01-27 01:44:02 +0000
commit57acf95c94d52b15b2ad6e6038bf3390d9063282 (patch)
treee7b07be0e204b1e9f4bda85be345efd9dcabca44
parentd40d874132bc5011a76bd883fdf9d2507a2f8149 (diff)
downloadqpid-python-57acf95c94d52b15b2ad6e6038bf3390d9063282.tar.gz
cluster: Add sequence number to events & frames
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737968 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/cluster.mk2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp31
-rw-r--r--cpp/src/qpid/cluster/Cluster.h3
-rw-r--r--cpp/src/qpid/cluster/ClusterLeaveException.h35
-rw-r--r--cpp/src/qpid/cluster/ClusterQueueHandler.h56
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp6
-rw-r--r--cpp/src/qpid/cluster/Connection.h5
-rw-r--r--cpp/src/qpid/cluster/Event.cpp8
-rw-r--r--cpp/src/qpid/cluster/Event.h7
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h18
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp1
-rw-r--r--cpp/src/qpid/cluster/types.h2
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h1
13 files changed, 94 insertions, 81 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index b13ce10b72..31eed2aec6 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -40,7 +40,7 @@ cluster_la_SOURCES = \
$(CMAN_SOURCES) \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
- qpid/cluster/ClusterLeaveException.h \
+ qpid/cluster/ClusterQueueHandler.h \
qpid/cluster/ClusterMap.cpp \
qpid/cluster/ClusterMap.h \
qpid/cluster/ClusterPlugin.cpp \
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0e1c049a9c..f8adb8ee98 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -20,6 +20,7 @@
#include "Connection.h"
#include "DumpClient.h"
#include "FailoverExchange.h"
+#include "ClusterQueueHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
@@ -97,11 +98,12 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
writeEstimate(writeEstimate_),
mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
- deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller),
- deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller),
+ deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller),
+ deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ sequence(0)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -195,6 +197,7 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
+ e.setSequence(sequence++);
if (from == myId) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e, l);
@@ -208,26 +211,6 @@ void Cluster::deliver(const Event& e, Lock&) {
}
// Entry point: called when deliverEventQueue has events to process.
-void Cluster::deliveredEvents(PollableEventQueue::Queue& events) {
- try {
- for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
- events.clear();
- } catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
- leave();
- }
-}
-
-void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) {
- try {
- for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1));
- frames.clear();
- } catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
- leave();
- }
-}
-
void Cluster::deliveredEvent(const Event& e) {
QPID_LATENCY_RECORD("delivered event queue", e);
Buffer buf(const_cast<char*>(e.getData()), e.getSize());
@@ -243,7 +226,7 @@ void Cluster::deliveredEvent(const Event& e) {
if (e.getType() == CONTROL) {
AMQFrame frame;
while (frame.decode(buf)) {
- deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame));
+ deliverFrameQueue.push(EventFrame(connection, e, frame));
}
}
else if (e.getType() == DATA) {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index ecd63a866e..ef63c4c3fe 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -129,8 +129,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void shutdown(const MemberId&, Lock&);
- void deliveredEvents(PollableEventQueue::Queue&);
- void deliveredFrames(PollableFrameQueue::Queue&);
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);
@@ -215,6 +213,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ClusterMap map;
size_t lastSize;
bool lastBroker;
+ uint64_t sequence;
// Dump related
sys::Thread dumpThread;
diff --git a/cpp/src/qpid/cluster/ClusterLeaveException.h b/cpp/src/qpid/cluster/ClusterLeaveException.h
deleted file mode 100644
index e5bdbc560a..0000000000
--- a/cpp/src/qpid/cluster/ClusterLeaveException.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#ifndef QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H
-#define QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/Exception.h"
-
-namespace qpid {
-namespace cluster {
-
-struct ClusterLeaveException : public Exception
-{
- ClusterLeaveException(const std::string& message=std::string()) : Exception(message) {}
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H*/
diff --git a/cpp/src/qpid/cluster/ClusterQueueHandler.h b/cpp/src/qpid/cluster/ClusterQueueHandler.h
new file mode 100644
index 0000000000..e843526962
--- /dev/null
+++ b/cpp/src/qpid/cluster/ClusterQueueHandler.h
@@ -0,0 +1,56 @@
+#ifndef QPID_CLUSTER_CLUSTERQUEUEHANDLER_H
+#define QPID_CLUSTER_CLUSTERQUEUEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cluster.h"
+#include "qpid/sys/PollableQueue.h"
+#include <qpid/log/Statement.h>
+
+namespace qpid {
+namespace cluster {
+
+/** Convenience functor for PollableQueue callbacks. */
+template <class T> struct ClusterQueueHandler {
+ ClusterQueueHandler(Cluster& c, boost::function<void (const T&)> f, const std::string& n) : cluster(c), callback(f), name(n) {}
+ ClusterQueueHandler(const Cluster* c, boost::function<void (const T&)> f, const std::string& n) : cluster(*const_cast<Cluster*>(c)), callback(f), name(n) {}
+
+ void operator()(typename sys::PollableQueue<T>::Queue& values) {
+ try {
+ std::for_each(values.begin(), values.end(), callback);
+ values.clear();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Error on " << name << ": " << e.what());
+ cluster.leave();
+ }
+ }
+
+ Cluster& cluster;
+ boost::function<void (const T&)> callback;
+ std::string name;
+};
+
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CLUSTERQUEUEHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index d05baffe3a..839a0e67b9 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -137,7 +137,7 @@ bool Connection::checkUnsupported(const AMQBody& body) {
}
// Decode buffer and put frames on frameq.
-void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) {
+void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) {
assert(!catchUp);
Buffer buf(e);
// Set read credit on the last frame.
@@ -145,10 +145,10 @@ void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) {
if (!mcastDecoder.decode(buf)) return;
AMQFrame frame(mcastDecoder.frame);
while (mcastDecoder.decode(buf)) {
- frameq.push(EventFrame(this, getId().getMember(), frame));
+ frameq.push(EventFrame(this, e, frame));
frame = mcastDecoder.frame;
}
- frameq.push(EventFrame(this, getId().getMember(), frame, readCredit));
+ frameq.push(EventFrame(this, e, frame, readCredit));
readCredit = 0;
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 29dee5eda4..3b18e22d17 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -26,7 +26,6 @@
#include "WriteEstimate.h"
#include "OutputInterceptor.h"
#include "NoOpConnectionOutputHandler.h"
-#include "Event.h"
#include "EventFrame.h"
#include "qpid/broker/Connection.h"
@@ -62,7 +61,7 @@ class Connection :
{
public:
- typedef sys::PollableQueue<EventFrame> EventFrameQueue;
+ typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
/** Local connection, use this in ConnectionId */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink);
@@ -102,7 +101,7 @@ class Connection :
size_t decode(const char* buffer, size_t size);
// Called for data delivered from the cluster.
- void deliveredEvent(const Event&, EventFrameQueue&);
+ void deliveredEvent(const Event&, PollableFrameQueue&);
void deliveredFrame(const EventFrame&);
void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 59a7241715..339c1de1dd 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -42,7 +42,7 @@ const size_t EventHeader::HEADER_SIZE =
;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
- : type(t), connectionId(c), size(s) {}
+ : type(t), connectionId(c), size(s), sequence(0) {}
Event::Event() {}
@@ -53,10 +53,10 @@ Event::Event(EventType t, const ConnectionId& c, size_t s)
void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
if (buf.available() <= HEADER_SIZE)
- throw ClusterLeaveException("Not enough for multicast header");
+ throw Exception("Not enough for multicast header");
type = (EventType)buf.getOctet();
if(type != DATA && type != CONTROL)
- throw ClusterLeaveException("Invalid multicast event type");
+ throw Exception("Invalid multicast event type");
connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
size = buf.getLong();
#ifdef QPID_LATENCY_METRIC
@@ -68,7 +68,7 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
Event e;
e.decode(m, buf); // Header
if (buf.available() < e.size)
- throw ClusterLeaveException("Not enough data for multicast event");
+ throw Exception("Not enough data for multicast event");
e.store = RefCountedBuffer::create(e.size + HEADER_SIZE);
memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
return e;
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index a1d21f5e04..5df0c96f77 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -23,10 +23,7 @@
*/
#include "types.h"
-#include "Cpg.h"
-#include "Connection.h"
#include "qpid/RefCountedBuffer.h"
-#include "qpid/framing/Buffer.h"
#include "qpid/sys/LatencyMetric.h"
#include <sys/uio.h> // For iovec
#include <iosfwd>
@@ -37,6 +34,7 @@ namespace qpid {
namespace framing {
class AMQBody;
+class Buffer;
}
namespace cluster {
@@ -52,6 +50,8 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
ConnectionId getConnectionId() const { return connectionId; }
MemberId getMemberId() const { return connectionId.getMember(); }
size_t getSize() const { return size; }
+ uint64_t getSequence() const { return sequence; }
+ void setSequence(uint64_t n) { sequence = n; }
bool isCluster() const { return connectionId.getPointer() == 0; }
bool isConnection() const { return connectionId.getPointer() != 0; }
@@ -62,6 +62,7 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
EventType type;
ConnectionId connectionId;
size_t size;
+ uint64_t sequence;
};
/**
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index 28ec6e4dc0..2420a2b1e5 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -23,6 +23,7 @@
*/
#include "types.h"
+#include "Event.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/sys/LatencyMetric.h"
#include <boost/intrusive_ptr.hpp>
@@ -37,19 +38,30 @@ class Connection;
*/
struct EventFrame
{
+ EventFrame() : sequence(0) {}
// Connection event frame
- EventFrame(const boost::intrusive_ptr<Connection>& c, const MemberId& m, const framing::AMQFrame& f, int rc=0)
- : connection(c), member(m), frame(f), readCredit(rc) {
+ EventFrame(const boost::intrusive_ptr<Connection>& c, const Event& e, const framing::AMQFrame& f, int rc=0)
+ : connection(c), member(e.getMemberId()), frame(f), sequence(e.getSequence()), readCredit(rc)
+ {
QPID_LATENCY_INIT(frame);
}
bool isCluster() const { return !connection; }
bool isConnection() const { return connection; }
+ bool isLastInEvent() const { return readCredit; }
+
+ // True if this frame follows immediately after frame e.
+ bool follows(const EventFrame& e) const {
+ return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit);
+ }
+
+ bool operator<(const EventFrame& e) const { return sequence < e.sequence; }
boost::intrusive_ptr<Connection> connection;
MemberId member;
framing::AMQFrame frame;
- int readCredit; // restore this much read credit when frame is processed
+ uint64_t sequence;
+ int readCredit; // last frame in an event, give credit when processed.
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 4fa12651eb..847088435c 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -21,7 +21,6 @@
#include "Multicaster.h"
#include "Cpg.h"
-#include "ClusterLeaveException.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index d8dc35f167..0797d472b6 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -22,8 +22,6 @@
*
*/
-
-#include "ClusterLeaveException.h"
#include "config.h"
#include "qpid/Url.h"
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 9fbe48fe84..b5ff98c2c7 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -46,6 +46,7 @@ template <class T>
class PollableQueue {
public:
typedef std::deque<T> Queue;
+ typedef T value_type;
/**
* Callback to process a batch of items from the queue.