diff options
author | Alan Conway <aconway@apache.org> | 2009-03-02 23:30:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-03-02 23:30:08 +0000 |
commit | a179ded965c5cc70a0666d07737c38c67c1558c1 (patch) | |
tree | 9c854273164ee106eaa611dd68870d818abfc2c1 /cpp/src/qpid/cluster/Event.cpp | |
parent | e669e97c7f1c034841986e18288af7629d356aa2 (diff) | |
download | qpid-python-a179ded965c5cc70a0666d07737c38c67c1558c1.tar.gz |
Replicate connection decoder fragments to new members.
Refactoring:
- Merge Decoder into ConnectionMap.
- Process cluster controls in event queue thread.
- Use counter not pointer for connection ID, avoid re-use.
- Do all processing in event queue thread to avoid races
(temporary pending performance measurements)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@749473 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Event.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 9fe5376bc5..749fbf240f 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -23,6 +23,7 @@ #include "Cpg.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/assert.h" #include <ostream> #include <iterator> #include <algorithm> @@ -31,6 +32,7 @@ namespace qpid { namespace cluster { using framing::Buffer; +using framing::AMQFrame; const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type @@ -57,7 +59,7 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { type = (EventType)buf.getOctet(); if(type != DATA && type != CONTROL) throw Exception("Invalid multicast event type"); - connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); + connectionId = ConnectionId(m, buf.getLongLong()); size = buf.getLong(); #ifdef QPID_LATENCY_METRIC latency_metric_timestamp = buf.getLongLong(); @@ -93,7 +95,7 @@ iovec Event::toIovec() { void EventHeader::encode(Buffer& b) const { b.putOctet(type); - b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); + b.putLongLong(connectionId.getNumber()); b.putLong(size); #ifdef QPID_LATENCY_METRIC b.putLongLong(latency_metric_timestamp); @@ -111,6 +113,14 @@ Event::operator Buffer() const { return Buffer(const_cast<char*>(getData()), getSize()); } +AMQFrame Event::getFrame() const { + assert(type == CONTROL); + Buffer buf(*this); + AMQFrame frame; + QPID_ASSERT(frame.decode(buf)); + return frame; +} + static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; std::ostream& operator << (std::ostream& o, EventType t) { |