diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 |
1 files changed, 14 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 2b12e4f54a..4c0a768c4f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -116,26 +116,22 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) { QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - // FIXME aconway 2008-09-02: restore queueing. - Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. - static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder. - Buffer buf(buffer, sizeof(buffer)); - buf.putOctet(CONTROL); - encodePtr(buf, connection.getConnectionPtr()); + Event e(CONTROL, connection, frame.size()); + Buffer buf(e); frame.encode(buf); - iovec iov = { buffer, buf.getPosition() }; - cpg.mcast(name, &iov, 1); + mcastEvent(e); +} + +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) { + QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data"); + Event e(DATA, connection, size); + memcpy(e.getData(), data, size); + mcastEvent(e); } -void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { - // FIXME aconway 2008-09-02: does this need locking? - Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. - char hdrbuf[1+sizeof(uint64_t)]; - Buffer buf(hdrbuf, sizeof(hdrbuf)); - buf.putOctet(DATA); - encodePtr(buf, id.getConnectionPtr()); - iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } }; - cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); +void Cluster::mcastEvent(const Event& e) { + QPID_LOG(trace, "Multicasting: " << e); + e.mcast(name, cpg); } size_t Cluster::size() const { @@ -186,6 +182,7 @@ void Cluster::deliver( } void Cluster::deliverEvent(const Event& e) { + QPID_LOG(trace, "Delivered: " << e); Buffer buf(e); if (e.getConnection().getConnectionPtr() == 0) { // Cluster control AMQFrame frame; |