diff options
author | Alan Conway <aconway@apache.org> | 2008-12-12 15:23:39 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-12-12 15:23:39 +0000 |
commit | b584b02b619ce4fe51e28b699b240fd5519d149a (patch) | |
tree | 81e67b184e07f007c90d7e6830ed59c9cf6715eb | |
parent | 618b2d6892fb5e4920e4ec661317dec095adf36d (diff) | |
download | qpid-python-b584b02b619ce4fe51e28b699b240fd5519d149a.tar.gz |
cluster/Event: store event header in the same buffer as data to simplify encoding.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@726043 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.cpp | 39 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.h | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Multicaster.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Multicaster.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/types.h | 1 |
5 files changed, 46 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index 5ac63c86f5..cfa8fe05f1 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -32,42 +32,45 @@ namespace cluster { using framing::Buffer; -const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint32_t) + sizeof(uint32_t); +const size_t Event::HEADER_SIZE = + sizeof(uint8_t) + // type + sizeof(uint64_t) + // connection pointer only, CPG provides member ID. + sizeof(uint32_t); // payload size -Event::Event(EventType t, const ConnectionId& c, size_t s, uint32_t i) - : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)), id(i) {} +Event::Event(EventType t, const ConnectionId& c, size_t s) + : type(t), connectionId(c), size(s), store(RefCountedBuffer::create(s+HEADER_SIZE)) { + encodeHeader(); +} Event Event::decode(const MemberId& m, framing::Buffer& buf) { - assert(buf.available() > OVERHEAD); + if (buf.available() <= HEADER_SIZE) + throw ClusterLeaveException("Not enough for multicast header"); EventType type((EventType)buf.getOctet()); - assert(type == DATA || type == CONTROL); + if(type != DATA && type != CONTROL) + throw ClusterLeaveException("Invalid multicast event type"); ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong())); - uint32_t id = buf.getLong(); uint32_t size = buf.getLong(); - Event e(type, connection, size, id); - assert(buf.available() >= size); + Event e(type, connection, size); + if (buf.available() < size) + throw ClusterLeaveException("Not enough data for multicast event"); memcpy(e.getData(), buf.getPointer() + buf.getPosition(), size); return e; } -Event Event::control(const framing::AMQBody& body, const ConnectionId& cid, uint32_t id) { +Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { framing::AMQFrame f(body); - Event e(CONTROL, cid, f.encodedSize(), id); + Event e(CONTROL, cid, f.encodedSize()); Buffer buf(e); f.encode(buf); return e; } -bool Event::mcast (Cpg& cpg) const { - char header[OVERHEAD]; - Buffer b(header, OVERHEAD); +void Event::encodeHeader () { + Buffer b(getStore(), HEADER_SIZE); b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); - b.putLong(id); b.putLong(size); - assert(b.getPosition() == OVERHEAD); - iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } }; - return cpg.mcast(iov, sizeof(iov)/sizeof(*iov)); + assert(b.getPosition() == HEADER_SIZE); } Event::operator Buffer() const { @@ -77,7 +80,7 @@ Event::operator Buffer() const { static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; std::ostream& operator << (std::ostream& o, const Event& e) { - o << "[event " << e.getConnectionId() << "/" << e.getId() + o << "[event " << e.getConnectionId() << " " << EVENT_TYPE_NAMES[e.getType()] << " " << e.getSize() << " bytes]"; return o; diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h index e046990747..427410923b 100644 --- a/qpid/cpp/src/qpid/cluster/Event.h +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -42,36 +42,43 @@ namespace cluster { */ class Event { public: - /** Create an event to mcast with a buffer of size bytes. */ - Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0, uint32_t id=0); + /** Create an event with a buffer that can hold size bytes plus an event header. */ + Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); /** Create an event copied from delivered data. */ static Event decode(const MemberId& m, framing::Buffer&); /** Create an event containing a control */ - static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0); - - bool mcast(Cpg& cpg) const; + static Event control(const framing::AMQBody&, const ConnectionId&); EventType getType() const { return type; } ConnectionId getConnectionId() const { return connectionId; } MemberId getMemberId() const { return connectionId.getMember(); } size_t getSize() const { return size; } - char* getData() { return data; } - const char* getData() const { return data; } - size_t getId() const { return id; } + + // Data excluding header. + char* getData() { return store + HEADER_SIZE; } + const char* getData() const { return store + HEADER_SIZE; } + + // Store including header + char* getStore() { return store; } + const char* getStore() const { return store; } + size_t getStoreSize() { return size + HEADER_SIZE; } + bool isCluster() const { return connectionId.getPointer() == 0; } bool isConnection() const { return connectionId.getPointer() != 0; } operator framing::Buffer() const; private: - static const size_t OVERHEAD; + static const size_t HEADER_SIZE; + + void encodeHeader(); + EventType type; ConnectionId connectionId; size_t size; - RefCountedBuffer::pointer data; - uint32_t id; + RefCountedBuffer::pointer store; }; std::ostream& operator << (std::ostream&, const Event&); diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index 896f7c6a6e..37d2f81b39 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -57,10 +57,13 @@ void Multicaster::mcast(const Event& e) { queue.push(e); } + void Multicaster::sendMcast(PollableEventQueue::Queue& values) { try { PollableEventQueue::Queue::iterator i = values.begin(); - while (i != values.end() && i->mcast(cpg)) { + while( i != values.end()) { + iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; + if (!cpg.mcast(&iov, 1)) break; // returns false for flow control QPID_LOG(trace, " MCAST " << *i); ++i; } diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h index e7aff7fe7c..8b306ce10e 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.h +++ b/qpid/cpp/src/qpid/cluster/Multicaster.h @@ -27,6 +27,7 @@ #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> +#include <sys/uio.h> // For iovec namespace qpid { @@ -63,6 +64,7 @@ class Multicaster PollableEventQueue queue; bool holding; PlainEventQueue holdingQueue; + std::vector<struct ::iovec> ioVector; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index 2154aa89ce..3ee20c4692 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -22,6 +22,7 @@ * */ +#include "ClusterLeaveException.h" #include <qpid/Url.h> #include <utility> |