summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Event.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Event.cpp')
-rw-r--r--cpp/src/qpid/cluster/Event.cpp33
1 files changed, 25 insertions, 8 deletions
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 8d4429a8ed..59a7241715 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -35,16 +35,21 @@ using framing::Buffer;
const size_t EventHeader::HEADER_SIZE =
sizeof(uint8_t) + // type
sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
- sizeof(uint32_t); // payload size
+ sizeof(uint32_t) // payload size
+#ifdef QPID_LATENCY_METRIC
+ + sizeof(int64_t) // timestamp
+#endif
+ ;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
: type(t), connectionId(c), size(s) {}
+
+Event::Event() {}
+
Event::Event(EventType t, const ConnectionId& c, size_t s)
: EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE))
-{
- encodeHeader();
-}
+{}
void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
if (buf.available() <= HEADER_SIZE)
@@ -54,14 +59,17 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
throw ClusterLeaveException("Invalid multicast event type");
connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
size = buf.getLong();
+#ifdef QPID_LATENCY_METRIC
+ latency_metric_timestamp = buf.getLongLong();
+#endif
}
Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
- EventHeader h;
- h.decode(m, buf); // Header
- Event e(h.getType(), h.getConnectionId(), h.getSize());
+ Event e;
+ e.decode(m, buf); // Header
if (buf.available() < e.size)
throw ClusterLeaveException("Not enough data for multicast event");
+ e.store = RefCountedBuffer::create(e.size + HEADER_SIZE);
memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
return e;
}
@@ -73,11 +81,20 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
f.encode(buf);
return e;
}
-
+
+iovec Event::toIovec() {
+ encodeHeader();
+ iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
+ return iov;
+}
+
void EventHeader::encode(Buffer& b) const {
b.putOctet(type);
b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
b.putLong(size);
+#ifdef QPID_LATENCY_METRIC
+ b.putLongLong(latency_metric_timestamp);
+#endif
}
// Encode my header in my buffer.