diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/RefCountedBuffer.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/RefCountedBuffer.h | 27 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 12 |
6 files changed, 76 insertions, 33 deletions
diff --git a/cpp/src/qpid/RefCountedBuffer.cpp b/cpp/src/qpid/RefCountedBuffer.cpp index 3a52b94412..2a8cbc0930 100644 --- a/cpp/src/qpid/RefCountedBuffer.cpp +++ b/cpp/src/qpid/RefCountedBuffer.cpp @@ -34,12 +34,19 @@ char* RefCountedBuffer::addr() const { return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer)); } -RefCountedBuffer::intrusive_ptr RefCountedBuffer::create(size_t n) { +RefCountedBuffer::pointer RefCountedBuffer::create(size_t n) { char* store=::new char[n+sizeof(RefCountedBuffer)]; new(store) RefCountedBuffer; - return reinterpret_cast<RefCountedBuffer*>(store); + return pointer(reinterpret_cast<RefCountedBuffer*>(store)); } +RefCountedBuffer::pointer::pointer() {} +RefCountedBuffer::pointer::pointer(RefCountedBuffer* x) : p(x) {} +RefCountedBuffer::pointer::pointer(const pointer& x) : p(x.p) {} +RefCountedBuffer::pointer::~pointer() {} +RefCountedBuffer::pointer& RefCountedBuffer::pointer::operator=(const RefCountedBuffer::pointer& x) { p = x.p; return *this; } + +char* RefCountedBuffer::pointer::cp() const { return p ? p->get() : 0; } } // namespace qpid diff --git a/cpp/src/qpid/RefCountedBuffer.h b/cpp/src/qpid/RefCountedBuffer.h index af46cbb92a..c332325378 100644 --- a/cpp/src/qpid/RefCountedBuffer.h +++ b/cpp/src/qpid/RefCountedBuffer.h @@ -27,7 +27,7 @@ #include <boost/intrusive_ptr.hpp> namespace qpid { - +// FIXME aconway 2008-09-06: easy to add alignment /** * Reference-counted byte buffer. * No alignment guarantees. @@ -39,11 +39,32 @@ class RefCountedBuffer : boost::noncopyable { char* addr() const; public: + /** Smart char pointer to a reference counted buffer */ + class pointer { + boost::intrusive_ptr<RefCountedBuffer> p; + char* cp() const; + pointer(RefCountedBuffer* x); + friend class RefCountedBuffer; - typedef boost::intrusive_ptr<RefCountedBuffer> intrusive_ptr; + public: + pointer(); + pointer(const pointer&); + ~pointer(); + pointer& operator=(const pointer&); + + char* get() { return cp(); } + operator char*() { return cp(); } + char& operator*() { return *cp(); } + char& operator[](size_t i) { return cp()[i]; } + const char* get() const { return cp(); } + operator const char*() const { return cp(); } + const char& operator*() const { return *cp(); } + const char& operator[](size_t i) const { return cp()[i]; } + }; + /** Create a reference counted buffer of size n */ - static intrusive_ptr create(size_t n); + static pointer create(size_t n); /** Get a pointer to the start of the buffer. */ char* get() { return addr(); } diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 2b12e4f54a..4c0a768c4f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -116,26 +116,22 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) { QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - // FIXME aconway 2008-09-02: restore queueing. - Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. - static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder. - Buffer buf(buffer, sizeof(buffer)); - buf.putOctet(CONTROL); - encodePtr(buf, connection.getConnectionPtr()); + Event e(CONTROL, connection, frame.size()); + Buffer buf(e); frame.encode(buf); - iovec iov = { buffer, buf.getPosition() }; - cpg.mcast(name, &iov, 1); + mcastEvent(e); +} + +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) { + QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data"); + Event e(DATA, connection, size); + memcpy(e.getData(), data, size); + mcastEvent(e); } -void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { - // FIXME aconway 2008-09-02: does this need locking? - Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. - char hdrbuf[1+sizeof(uint64_t)]; - Buffer buf(hdrbuf, sizeof(hdrbuf)); - buf.putOctet(DATA); - encodePtr(buf, id.getConnectionPtr()); - iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } }; - cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); +void Cluster::mcastEvent(const Event& e) { + QPID_LOG(trace, "Multicasting: " << e); + e.mcast(name, cpg); } size_t Cluster::size() const { @@ -186,6 +182,7 @@ void Cluster::deliver( } void Cluster::deliverEvent(const Event& e) { + QPID_LOG(trace, "Delivered: " << e); Buffer buf(e); if (e.getConnection().getConnectionPtr() == 0) { // Cluster control AMQFrame frame; diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 7d3ef13b14..0cd916d5fb 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -67,10 +67,11 @@ class Cluster : public RefCounted, private Cpg::Handler bool empty() const { return size() == 0; } - /** Send frame to the cluster */ + /** Send to the cluster */ void mcastFrame(const framing::AMQFrame&, const ConnectionId&); void mcastBuffer(const char*, size_t, const ConnectionId&); - + void mcastEvent(const Event& e); + /** Leave the cluster */ void leave(); diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 66f3cf261b..4dfb7ab400 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -22,13 +22,16 @@ #include "Event.h" #include "Cpg.h" #include "qpid/framing/Buffer.h" +#include <ostream> +#include <iterator> +#include <algorithm> namespace qpid { namespace cluster { using framing::Buffer; -const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/; +const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t); Event::Event(EventType t, const ConnectionId c, const size_t s) : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {} @@ -43,15 +46,27 @@ Event Event::delivered(const MemberId& m, void* d, size_t s) { return e; } -void Event::mcast(const Cpg::Name& name, Cpg& cpg) { +void Event::mcast (const Cpg::Name& name, Cpg& cpg) const { char header[OVERHEAD]; - Buffer b; + Buffer b(header, OVERHEAD); b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr())); - iovec iov[] = { { header, b.getPosition() }, { data.get(), size } }; + iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } }; cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); } +Event::operator Buffer() const { + return Buffer(const_cast<char*>(getData()), getSize()); +} +static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; +std::ostream& operator << (std::ostream& o, const Event& e) { + o << "[event: " << e.getConnection() + << " " << EVENT_TYPE_NAMES[e.getType()] + << " " << e.getSize() << " bytes: "; + std::ostream_iterator<char> oi(o,""); + std::copy(e.getData(), e.getData()+std::min(e.getSize(), size_t(16)), oi); + return o << "...]"; +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 14ae253dc2..d0e12831f4 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -26,6 +26,7 @@ #include "Cpg.h" #include "qpid/RefCountedBuffer.h" #include "qpid/framing/Buffer.h" +#include <iosfwd> namespace qpid { namespace cluster { @@ -46,24 +47,25 @@ struct Event { /** Create an event copied from delivered data. */ static Event delivered(const MemberId& m, void* data, size_t size); - void mcast(const Cpg::Name& name, Cpg& cpg); + void mcast(const Cpg::Name& name, Cpg& cpg) const; EventType getType() const { return type; } ConnectionId getConnection() const { return connection; } size_t getSize() const { return size; } - char* getData() { return data->get(); } - const char* getData() const { return data->get(); } + char* getData() { return data; } + const char* getData() const { return data; } - operator framing::Buffer() const { return framing::Buffer(const_cast<char*>(getData()), getSize()); } + operator framing::Buffer() const; private: static const size_t OVERHEAD; EventType type; ConnectionId connection; size_t size; - RefCountedBuffer::intrusive_ptr data; + RefCountedBuffer::pointer data; }; +std::ostream& operator << (std::ostream&, const Event&); }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_EVENT_H*/ |