summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-12 15:23:39 +0000
committerAlan Conway <aconway@apache.org>2008-12-12 15:23:39 +0000
commitb584b02b619ce4fe51e28b699b240fd5519d149a (patch)
tree81e67b184e07f007c90d7e6830ed59c9cf6715eb
parent618b2d6892fb5e4920e4ec661317dec095adf36d (diff)
downloadqpid-python-b584b02b619ce4fe51e28b699b240fd5519d149a.tar.gz
cluster/Event: store event header in the same buffer as data to simplify encoding.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@726043 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.cpp39
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.h29
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h1
5 files changed, 46 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp
index 5ac63c86f5..cfa8fe05f1 100644
--- a/qpid/cpp/src/qpid/cluster/Event.cpp
+++ b/qpid/cpp/src/qpid/cluster/Event.cpp
@@ -32,42 +32,45 @@ namespace cluster {
using framing::Buffer;
-const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint32_t) + sizeof(uint32_t);
+const size_t Event::HEADER_SIZE =
+ sizeof(uint8_t) + // type
+ sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
+ sizeof(uint32_t); // payload size
-Event::Event(EventType t, const ConnectionId& c, size_t s, uint32_t i)
- : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)), id(i) {}
+Event::Event(EventType t, const ConnectionId& c, size_t s)
+ : type(t), connectionId(c), size(s), store(RefCountedBuffer::create(s+HEADER_SIZE)) {
+ encodeHeader();
+}
Event Event::decode(const MemberId& m, framing::Buffer& buf) {
- assert(buf.available() > OVERHEAD);
+ if (buf.available() <= HEADER_SIZE)
+ throw ClusterLeaveException("Not enough for multicast header");
EventType type((EventType)buf.getOctet());
- assert(type == DATA || type == CONTROL);
+ if(type != DATA && type != CONTROL)
+ throw ClusterLeaveException("Invalid multicast event type");
ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong()));
- uint32_t id = buf.getLong();
uint32_t size = buf.getLong();
- Event e(type, connection, size, id);
- assert(buf.available() >= size);
+ Event e(type, connection, size);
+ if (buf.available() < size)
+ throw ClusterLeaveException("Not enough data for multicast event");
memcpy(e.getData(), buf.getPointer() + buf.getPosition(), size);
return e;
}
-Event Event::control(const framing::AMQBody& body, const ConnectionId& cid, uint32_t id) {
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
framing::AMQFrame f(body);
- Event e(CONTROL, cid, f.encodedSize(), id);
+ Event e(CONTROL, cid, f.encodedSize());
Buffer buf(e);
f.encode(buf);
return e;
}
-bool Event::mcast (Cpg& cpg) const {
- char header[OVERHEAD];
- Buffer b(header, OVERHEAD);
+void Event::encodeHeader () {
+ Buffer b(getStore(), HEADER_SIZE);
b.putOctet(type);
b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
- b.putLong(id);
b.putLong(size);
- assert(b.getPosition() == OVERHEAD);
- iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } };
- return cpg.mcast(iov, sizeof(iov)/sizeof(*iov));
+ assert(b.getPosition() == HEADER_SIZE);
}
Event::operator Buffer() const {
@@ -77,7 +80,7 @@ Event::operator Buffer() const {
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
std::ostream& operator << (std::ostream& o, const Event& e) {
- o << "[event " << e.getConnectionId() << "/" << e.getId()
+ o << "[event " << e.getConnectionId()
<< " " << EVENT_TYPE_NAMES[e.getType()]
<< " " << e.getSize() << " bytes]";
return o;
diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h
index e046990747..427410923b 100644
--- a/qpid/cpp/src/qpid/cluster/Event.h
+++ b/qpid/cpp/src/qpid/cluster/Event.h
@@ -42,36 +42,43 @@ namespace cluster {
*/
class Event {
public:
- /** Create an event to mcast with a buffer of size bytes. */
- Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0, uint32_t id=0);
+ /** Create an event with a buffer that can hold size bytes plus an event header. */
+ Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
/** Create an event copied from delivered data. */
static Event decode(const MemberId& m, framing::Buffer&);
/** Create an event containing a control */
- static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0);
-
- bool mcast(Cpg& cpg) const;
+ static Event control(const framing::AMQBody&, const ConnectionId&);
EventType getType() const { return type; }
ConnectionId getConnectionId() const { return connectionId; }
MemberId getMemberId() const { return connectionId.getMember(); }
size_t getSize() const { return size; }
- char* getData() { return data; }
- const char* getData() const { return data; }
- size_t getId() const { return id; }
+
+ // Data excluding header.
+ char* getData() { return store + HEADER_SIZE; }
+ const char* getData() const { return store + HEADER_SIZE; }
+
+ // Store including header
+ char* getStore() { return store; }
+ const char* getStore() const { return store; }
+ size_t getStoreSize() { return size + HEADER_SIZE; }
+
bool isCluster() const { return connectionId.getPointer() == 0; }
bool isConnection() const { return connectionId.getPointer() != 0; }
operator framing::Buffer() const;
private:
- static const size_t OVERHEAD;
+ static const size_t HEADER_SIZE;
+
+ void encodeHeader();
+
EventType type;
ConnectionId connectionId;
size_t size;
- RefCountedBuffer::pointer data;
- uint32_t id;
+ RefCountedBuffer::pointer store;
};
std::ostream& operator << (std::ostream&, const Event&);
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
index 896f7c6a6e..37d2f81b39 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
@@ -57,10 +57,13 @@ void Multicaster::mcast(const Event& e) {
queue.push(e);
}
+
void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
try {
PollableEventQueue::Queue::iterator i = values.begin();
- while (i != values.end() && i->mcast(cpg)) {
+ while( i != values.end()) {
+ iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() };
+ if (!cpg.mcast(&iov, 1)) break; // returns false for flow control
QPID_LOG(trace, " MCAST " << *i);
++i;
}
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h
index e7aff7fe7c..8b306ce10e 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.h
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.h
@@ -27,6 +27,7 @@
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
+#include <sys/uio.h> // For iovec
namespace qpid {
@@ -63,6 +64,7 @@ class Multicaster
PollableEventQueue queue;
bool holding;
PlainEventQueue holdingQueue;
+ std::vector<struct ::iovec> ioVector;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
index 2154aa89ce..3ee20c4692 100644
--- a/qpid/cpp/src/qpid/cluster/types.h
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -22,6 +22,7 @@
*
*/
+#include "ClusterLeaveException.h"
#include <qpid/Url.h>
#include <utility>