summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-01-16 17:25:18 +0000
committerAlan Conway <aconway@apache.org>2009-01-16 17:25:18 +0000
commit480ed248512cedd1ca5197f2a4fb7f805943e992 (patch)
treebb72fde8bb9c263cc74e96107fcf136014463ad6 /qpid/cpp/src
parent5e6509d4ca9f52c2a7f6f15b86fbfb5586c2c956 (diff)
downloadqpid-python-480ed248512cedd1ca5197f2a4fb7f805943e992.tar.gz
Separate cluster::EventHeader to allow non-copy events.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@735059 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.cpp36
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.h40
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp11
4 files changed, 51 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index ad1f4b704d..ce564939b8 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -187,7 +187,7 @@ void Cluster::deliver(
Mutex::ScopedLock l(lock);
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
- Event e(Event::decode(from, buf));
+ Event e(Event::decodeCopy(from, buf));
if (from == myId) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e, l);
diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp
index cfa8fe05f1..5fe2b4d5b0 100644
--- a/qpid/cpp/src/qpid/cluster/Event.cpp
+++ b/qpid/cpp/src/qpid/cluster/Event.cpp
@@ -32,28 +32,37 @@ namespace cluster {
using framing::Buffer;
-const size_t Event::HEADER_SIZE =
+const size_t EventHeader::HEADER_SIZE =
sizeof(uint8_t) + // type
sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
sizeof(uint32_t); // payload size
+EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
+ : type(t), connectionId(c), size(s) {}
+
Event::Event(EventType t, const ConnectionId& c, size_t s)
- : type(t), connectionId(c), size(s), store(RefCountedBuffer::create(s+HEADER_SIZE)) {
+ : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE))
+{
encodeHeader();
}
-Event Event::decode(const MemberId& m, framing::Buffer& buf) {
+void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
if (buf.available() <= HEADER_SIZE)
throw ClusterLeaveException("Not enough for multicast header");
- EventType type((EventType)buf.getOctet());
+ type = (EventType)buf.getOctet();
if(type != DATA && type != CONTROL)
throw ClusterLeaveException("Invalid multicast event type");
- ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong()));
- uint32_t size = buf.getLong();
- Event e(type, connection, size);
- if (buf.available() < size)
+ connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+ size = buf.getLong();
+}
+
+Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
+ EventHeader h;
+ h.decode(m, buf); // Header
+ Event e(h.getType(), h.getConnectionId(), h.getSize());
+ if (buf.available() < e.size)
throw ClusterLeaveException("Not enough data for multicast event");
- memcpy(e.getData(), buf.getPointer() + buf.getPosition(), size);
+ memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
return e;
}
@@ -65,11 +74,16 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
return e;
}
-void Event::encodeHeader () {
- Buffer b(getStore(), HEADER_SIZE);
+void EventHeader::encode(Buffer& b) const {
b.putOctet(type);
b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
b.putLong(size);
+}
+
+// Encode my header in my buffer.
+void Event::encodeHeader () {
+ Buffer b(getStore(), HEADER_SIZE);
+ encode(b);
assert(b.getPosition() == HEADER_SIZE);
}
diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h
index 427410923b..c63e09ca46 100644
--- a/qpid/cpp/src/qpid/cluster/Event.h
+++ b/qpid/cpp/src/qpid/cluster/Event.h
@@ -36,26 +36,44 @@ namespace cluster {
// byte-stream data.
//
+/** Header data for a multicast event */
+class EventHeader {
+ public:
+ EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
+ void decode(const MemberId& m, framing::Buffer&);
+ void encode(framing::Buffer&) const;
+
+ EventType getType() const { return type; }
+ ConnectionId getConnectionId() const { return connectionId; }
+ MemberId getMemberId() const { return connectionId.getMember(); }
+ size_t getSize() const { return size; }
+
+ bool isCluster() const { return connectionId.getPointer() == 0; }
+ bool isConnection() const { return connectionId.getPointer() != 0; }
+
+ protected:
+ static const size_t HEADER_SIZE;
+
+ EventType type;
+ ConnectionId connectionId;
+ size_t size;
+};
+
/**
* Events are sent to/received from the cluster.
* Refcounted so they can be stored on queues.
*/
-class Event {
+class Event : public EventHeader {
public:
/** Create an event with a buffer that can hold size bytes plus an event header. */
Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
/** Create an event copied from delivered data. */
- static Event decode(const MemberId& m, framing::Buffer&);
+ static Event decodeCopy(const MemberId& m, framing::Buffer&);
/** Create an event containing a control */
static Event control(const framing::AMQBody&, const ConnectionId&);
- EventType getType() const { return type; }
- ConnectionId getConnectionId() const { return connectionId; }
- MemberId getMemberId() const { return connectionId.getMember(); }
- size_t getSize() const { return size; }
-
// Data excluding header.
char* getData() { return store + HEADER_SIZE; }
const char* getData() const { return store + HEADER_SIZE; }
@@ -65,19 +83,11 @@ class Event {
const char* getStore() const { return store; }
size_t getStoreSize() { return size + HEADER_SIZE; }
- bool isCluster() const { return connectionId.getPointer() == 0; }
- bool isConnection() const { return connectionId.getPointer() != 0; }
-
operator framing::Buffer() const;
private:
- static const size_t HEADER_SIZE;
-
void encodeHeader();
- EventType type;
- ConnectionId connectionId;
- size_t size;
RefCountedBuffer::pointer store;
};
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index f4a38ae861..6ca957f310 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -234,17 +234,6 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) {
BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
}
-QPID_AUTO_TEST_CASE(testUnsupported) {
- ScopedSuppressLogging sl;
- ClusterFixture cluster(1);
- Client c1(cluster[0], "c1");
- BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException);
- Client c2(cluster[0], "c2");
- Message m;
- m.getDeliveryProperties().setTtl(1);
- BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
-}
-
QPID_AUTO_TEST_CASE(testTxTransaction) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");