summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Event.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Event.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.cpp31
1 files changed, 23 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp
index e30b961b3e..1cb010c266 100644
--- a/qpid/cpp/src/qpid/cluster/Event.cpp
+++ b/qpid/cpp/src/qpid/cluster/Event.cpp
@@ -23,6 +23,7 @@
#include "Cpg.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/assert.h"
#include <ostream>
#include <iterator>
#include <algorithm>
@@ -31,6 +32,7 @@ namespace qpid {
namespace cluster {
using framing::Buffer;
+using framing::AMQFrame;
const size_t EventHeader::HEADER_SIZE =
sizeof(uint8_t) + // type
@@ -42,7 +44,7 @@ const size_t EventHeader::HEADER_SIZE =
;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
- : type(t), connectionId(c), size(s), sequence(0) {}
+ : type(t), connectionId(c), size(s) {}
Event::Event() {}
@@ -57,7 +59,7 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
type = (EventType)buf.getOctet();
if(type != DATA && type != CONTROL)
throw Exception("Invalid multicast event type");
- connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+ connectionId = ConnectionId(m, buf.getLongLong());
size = buf.getLong();
#ifdef QPID_LATENCY_METRIC
latency_metric_timestamp = buf.getLongLong();
@@ -74,14 +76,17 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
return e;
}
-Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
- framing::AMQFrame f(body);
+Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) {
Event e(CONTROL, cid, f.encodedSize());
Buffer buf(e);
f.encode(buf);
return e;
}
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
+ return control(framing::AMQFrame(body), cid);
+}
+
iovec Event::toIovec() {
encodeHeader();
iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
@@ -90,7 +95,7 @@ iovec Event::toIovec() {
void EventHeader::encode(Buffer& b) const {
b.putOctet(type);
- b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
+ b.putLongLong(connectionId.getNumber());
b.putLong(size);
#ifdef QPID_LATENCY_METRIC
b.putLongLong(latency_metric_timestamp);
@@ -108,12 +113,22 @@ Event::operator Buffer() const {
return Buffer(const_cast<char*>(getData()), getSize());
}
+AMQFrame Event::getFrame() const {
+ assert(type == CONTROL);
+ Buffer buf(*this);
+ AMQFrame frame;
+ QPID_ASSERT(frame.decode(buf));
+ return frame;
+}
+
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+std::ostream& operator << (std::ostream& o, EventType t) {
+ return o << EVENT_TYPE_NAMES[t];
+}
+
std::ostream& operator << (std::ostream& o, const EventHeader& e) {
- o << "[event " << e.getConnectionId() << "/" << e.getSequence()
- << " " << EVENT_TYPE_NAMES[e.getType()]
- << " " << e.getSize() << " bytes]";
+ o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
return o;
}