diff options
author | Alan Conway <aconway@apache.org> | 2009-01-16 17:25:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-01-16 17:25:18 +0000 |
commit | 480ed248512cedd1ca5197f2a4fb7f805943e992 (patch) | |
tree | bb72fde8bb9c263cc74e96107fcf136014463ad6 /qpid/cpp/src | |
parent | 5e6509d4ca9f52c2a7f6f15b86fbfb5586c2c956 (diff) | |
download | qpid-python-480ed248512cedd1ca5197f2a4fb7f805943e992.tar.gz |
Separate cluster::EventHeader to allow non-copy events.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@735059 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.cpp | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.h | 40 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 11 |
4 files changed, 51 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index ad1f4b704d..ce564939b8 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -187,7 +187,7 @@ void Cluster::deliver( Mutex::ScopedLock l(lock); MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); - Event e(Event::decode(from, buf)); + Event e(Event::decodeCopy(from, buf)); if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e, l); diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index cfa8fe05f1..5fe2b4d5b0 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -32,28 +32,37 @@ namespace cluster { using framing::Buffer; -const size_t Event::HEADER_SIZE = +const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type sizeof(uint64_t) + // connection pointer only, CPG provides member ID. sizeof(uint32_t); // payload size +EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) + : type(t), connectionId(c), size(s) {} + Event::Event(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s), store(RefCountedBuffer::create(s+HEADER_SIZE)) { + : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE)) +{ encodeHeader(); } -Event Event::decode(const MemberId& m, framing::Buffer& buf) { +void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { if (buf.available() <= HEADER_SIZE) throw ClusterLeaveException("Not enough for multicast header"); - EventType type((EventType)buf.getOctet()); + type = (EventType)buf.getOctet(); if(type != DATA && type != CONTROL) throw ClusterLeaveException("Invalid multicast event type"); - ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong())); - uint32_t size = buf.getLong(); - Event e(type, connection, size); - if (buf.available() < size) + connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); + size = buf.getLong(); +} + +Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { + EventHeader h; + h.decode(m, buf); // Header + Event e(h.getType(), h.getConnectionId(), h.getSize()); + if (buf.available() < e.size) throw ClusterLeaveException("Not enough data for multicast event"); - memcpy(e.getData(), buf.getPointer() + buf.getPosition(), size); + memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); return e; } @@ -65,11 +74,16 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { return e; } -void Event::encodeHeader () { - Buffer b(getStore(), HEADER_SIZE); +void EventHeader::encode(Buffer& b) const { b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); b.putLong(size); +} + +// Encode my header in my buffer. +void Event::encodeHeader () { + Buffer b(getStore(), HEADER_SIZE); + encode(b); assert(b.getPosition() == HEADER_SIZE); } diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h index 427410923b..c63e09ca46 100644 --- a/qpid/cpp/src/qpid/cluster/Event.h +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -36,26 +36,44 @@ namespace cluster { // byte-stream data. // +/** Header data for a multicast event */ +class EventHeader { + public: + EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); + void decode(const MemberId& m, framing::Buffer&); + void encode(framing::Buffer&) const; + + EventType getType() const { return type; } + ConnectionId getConnectionId() const { return connectionId; } + MemberId getMemberId() const { return connectionId.getMember(); } + size_t getSize() const { return size; } + + bool isCluster() const { return connectionId.getPointer() == 0; } + bool isConnection() const { return connectionId.getPointer() != 0; } + + protected: + static const size_t HEADER_SIZE; + + EventType type; + ConnectionId connectionId; + size_t size; +}; + /** * Events are sent to/received from the cluster. * Refcounted so they can be stored on queues. */ -class Event { +class Event : public EventHeader { public: /** Create an event with a buffer that can hold size bytes plus an event header. */ Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); /** Create an event copied from delivered data. */ - static Event decode(const MemberId& m, framing::Buffer&); + static Event decodeCopy(const MemberId& m, framing::Buffer&); /** Create an event containing a control */ static Event control(const framing::AMQBody&, const ConnectionId&); - EventType getType() const { return type; } - ConnectionId getConnectionId() const { return connectionId; } - MemberId getMemberId() const { return connectionId.getMember(); } - size_t getSize() const { return size; } - // Data excluding header. char* getData() { return store + HEADER_SIZE; } const char* getData() const { return store + HEADER_SIZE; } @@ -65,19 +83,11 @@ class Event { const char* getStore() const { return store; } size_t getStoreSize() { return size + HEADER_SIZE; } - bool isCluster() const { return connectionId.getPointer() == 0; } - bool isConnection() const { return connectionId.getPointer() != 0; } - operator framing::Buffer() const; private: - static const size_t HEADER_SIZE; - void encodeHeader(); - EventType type; - ConnectionId connectionId; - size_t size; RefCountedBuffer::pointer store; }; diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index f4a38ae861..6ca957f310 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -234,17 +234,6 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC))); } -QPID_AUTO_TEST_CASE(testUnsupported) { - ScopedSuppressLogging sl; - ClusterFixture cluster(1); - Client c1(cluster[0], "c1"); - BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException); - Client c2(cluster[0], "c2"); - Message m; - m.getDeliveryProperties().setTtl(1); - BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception); -} - QPID_AUTO_TEST_CASE(testTxTransaction) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); |