diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Event.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.cpp | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index e30b961b3e..1cb010c266 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -23,6 +23,7 @@ #include "Cpg.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/assert.h" #include <ostream> #include <iterator> #include <algorithm> @@ -31,6 +32,7 @@ namespace qpid { namespace cluster { using framing::Buffer; +using framing::AMQFrame; const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type @@ -42,7 +44,7 @@ const size_t EventHeader::HEADER_SIZE = ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s), sequence(0) {} + : type(t), connectionId(c), size(s) {} Event::Event() {} @@ -57,7 +59,7 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { type = (EventType)buf.getOctet(); if(type != DATA && type != CONTROL) throw Exception("Invalid multicast event type"); - connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); + connectionId = ConnectionId(m, buf.getLongLong()); size = buf.getLong(); #ifdef QPID_LATENCY_METRIC latency_metric_timestamp = buf.getLongLong(); @@ -74,14 +76,17 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { return e; } -Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { - framing::AMQFrame f(body); +Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) { Event e(CONTROL, cid, f.encodedSize()); Buffer buf(e); f.encode(buf); return e; } +Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { + return control(framing::AMQFrame(body), cid); +} + iovec Event::toIovec() { encodeHeader(); iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; @@ -90,7 +95,7 @@ iovec Event::toIovec() { void EventHeader::encode(Buffer& b) const { b.putOctet(type); - b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); + b.putLongLong(connectionId.getNumber()); b.putLong(size); #ifdef QPID_LATENCY_METRIC b.putLongLong(latency_metric_timestamp); @@ -108,12 +113,22 @@ Event::operator Buffer() const { return Buffer(const_cast<char*>(getData()), getSize()); } +AMQFrame Event::getFrame() const { + assert(type == CONTROL); + Buffer buf(*this); + AMQFrame frame; + QPID_ASSERT(frame.decode(buf)); + return frame; +} + static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; +std::ostream& operator << (std::ostream& o, EventType t) { + return o << EVENT_TYPE_NAMES[t]; +} + std::ostream& operator << (std::ostream& o, const EventHeader& e) { - o << "[event " << e.getConnectionId() << "/" << e.getSequence() - << " " << EVENT_TYPE_NAMES[e.getType()] - << " " << e.getSize() << " bytes]"; + o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]"; return o; } |