summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp31
1 files changed, 14 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 2b12e4f54a..4c0a768c4f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -116,26 +116,22 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) {
void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- // FIXME aconway 2008-09-02: restore queueing.
- Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
- static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder.
- Buffer buf(buffer, sizeof(buffer));
- buf.putOctet(CONTROL);
- encodePtr(buf, connection.getConnectionPtr());
+ Event e(CONTROL, connection, frame.size());
+ Buffer buf(e);
frame.encode(buf);
- iovec iov = { buffer, buf.getPosition() };
- cpg.mcast(name, &iov, 1);
+ mcastEvent(e);
+}
+
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) {
+ QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data");
+ Event e(DATA, connection, size);
+ memcpy(e.getData(), data, size);
+ mcastEvent(e);
}
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
- // FIXME aconway 2008-09-02: does this need locking?
- Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
- char hdrbuf[1+sizeof(uint64_t)];
- Buffer buf(hdrbuf, sizeof(hdrbuf));
- buf.putOctet(DATA);
- encodePtr(buf, id.getConnectionPtr());
- iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } };
- cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+void Cluster::mcastEvent(const Event& e) {
+ QPID_LOG(trace, "Multicasting: " << e);
+ e.mcast(name, cpg);
}
size_t Cluster::size() const {
@@ -186,6 +182,7 @@ void Cluster::deliver(
}
void Cluster::deliverEvent(const Event& e) {
+ QPID_LOG(trace, "Delivered: " << e);
Buffer buf(e);
if (e.getConnection().getConnectionPtr() == 0) { // Cluster control
AMQFrame frame;