diff options
author | Alan Conway <aconway@apache.org> | 2008-09-06 14:10:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-06 14:10:08 +0000 |
commit | 028745dbc3c47bd6561310678f82f15bd45678d9 (patch) | |
tree | 036ac009b7ccbcafcf4b6c2aa375bb19237f5a0e /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 3bb7782cb0904f0abf61b7fb28da7bce905ceb08 (diff) | |
download | qpid-python-028745dbc3c47bd6561310678f82f15bd45678d9.tar.gz |
RefCountedBuffer improvements, centralize cluster encoding/decoding in Event.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@692654 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 |
1 files changed, 14 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 2b12e4f54a..4c0a768c4f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -116,26 +116,22 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) { QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - // FIXME aconway 2008-09-02: restore queueing. - Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. - static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder. - Buffer buf(buffer, sizeof(buffer)); - buf.putOctet(CONTROL); - encodePtr(buf, connection.getConnectionPtr()); + Event e(CONTROL, connection, frame.size()); + Buffer buf(e); frame.encode(buf); - iovec iov = { buffer, buf.getPosition() }; - cpg.mcast(name, &iov, 1); + mcastEvent(e); +} + +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) { + QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data"); + Event e(DATA, connection, size); + memcpy(e.getData(), data, size); + mcastEvent(e); } -void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { - // FIXME aconway 2008-09-02: does this need locking? - Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. - char hdrbuf[1+sizeof(uint64_t)]; - Buffer buf(hdrbuf, sizeof(hdrbuf)); - buf.putOctet(DATA); - encodePtr(buf, id.getConnectionPtr()); - iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } }; - cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); +void Cluster::mcastEvent(const Event& e) { + QPID_LOG(trace, "Multicasting: " << e); + e.mcast(name, cpg); } size_t Cluster::size() const { @@ -186,6 +182,7 @@ void Cluster::deliver( } void Cluster::deliverEvent(const Event& e) { + QPID_LOG(trace, "Delivered: " << e); Buffer buf(e); if (e.getConnection().getConnectionPtr() == 0) { // Cluster control AMQFrame frame; |