diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/cpp/src/qpid/cluster/Event.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Event.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.cpp | 134 |
1 files changed, 134 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp new file mode 100644 index 0000000000..da2bc89d8c --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -0,0 +1,134 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/cluster/types.h" +#include "qpid/cluster/Event.h" +#include "qpid/cluster/Cpg.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/RefCountedBuffer.h" +#include "qpid/assert.h" +#include <ostream> +#include <iterator> +#include <algorithm> + +namespace qpid { +namespace cluster { + +using framing::Buffer; +using framing::AMQFrame; + +const size_t EventHeader::HEADER_SIZE = + sizeof(uint8_t) + // type + sizeof(uint64_t) + // connection pointer only, CPG provides member ID. + sizeof(uint32_t) // payload size + ; + +EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) + : type(t), connectionId(c), size(s) {} + + +Event::Event() {} + +Event::Event(EventType t, const ConnectionId& c, size_t s) + : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE)) +{} + +void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { + QPID_ASSERT(buf.available() >= HEADER_SIZE); + type = (EventType)buf.getOctet(); + QPID_ASSERT(type == DATA || type == CONTROL); + connectionId = ConnectionId(m, buf.getLongLong()); + size = buf.getLong(); +} + +Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { + Event e; + e.decode(m, buf); // Header + QPID_ASSERT(buf.available() >= e.size); + e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); + memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); + return e; +} + +Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) { + Event e(CONTROL, cid, f.encodedSize()); + Buffer buf(e); + f.encode(buf); + return e; +} + +Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { + return control(framing::AMQFrame(body), cid); +} + +iovec Event::toIovec() const { + encodeHeader(); + iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; + return iov; +} + +void EventHeader::encode(Buffer& b) const { + b.putOctet(type); + b.putLongLong(connectionId.getNumber()); + b.putLong(size); +} + +// Encode my header in my buffer. +void Event::encodeHeader () const { + Buffer b(const_cast<char*>(getStore()), HEADER_SIZE); + encode(b); + assert(b.getPosition() == HEADER_SIZE); +} + +Event::operator Buffer() const { + return Buffer(const_cast<char*>(getData()), getSize()); +} + +const AMQFrame& Event::getFrame() const { + assert(type == CONTROL); + if (!frame.getBody()) { + Buffer buf(*this); + QPID_ASSERT(frame.decode(buf)); + } + return frame; +} + +static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; + +std::ostream& operator<< (std::ostream& o, EventType t) { + return o << EVENT_TYPE_NAMES[t]; +} + +std::ostream& operator<< (std::ostream& o, const EventHeader& e) { + return o << "Event[" << e.getConnectionId() << " " << e.getType() + << " " << e.getSize() << " bytes]"; +} + +std::ostream& operator<< (std::ostream& o, const Event& e) { + o << "Event[" << e.getConnectionId() << " "; + if (e.getType() == CONTROL) + o << e.getFrame(); + else + o << " data " << e.getSize() << " bytes"; + return o << "]"; +} + +}} // namespace qpid::cluster |