diff options
author | Alan Conway <aconway@apache.org> | 2008-09-03 03:21:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-03 03:21:00 +0000 |
commit | e8f6b7cd234088e7c33e42eb10e29719ea8e8aa9 (patch) | |
tree | 2d7df29ebba337fbbe28aa7716f0e32ff9e24c70 /cpp/src | |
parent | 05b6583dc0d080d6bc5a0cca09218bb045090daf (diff) | |
download | qpid-python-e8f6b7cd234088e7c33e42eb10e29719ea8e8aa9.tar.gz |
Cluster multicasts buffers rather than frames.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@691489 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 215 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 23 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 38 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 23 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/types.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Buffer.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameDecoder.cpp | 68 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameDecoder.h | 44 | ||||
-rw-r--r-- | cpp/src/tests/.valgrind.supp | 7 | ||||
-rw-r--r-- | cpp/src/tests/ForkedBroker.h | 5 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 7 |
19 files changed, 304 insertions, 181 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 039c35ed4e..3f46874b20 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -209,6 +209,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/FieldTable.cpp \ qpid/framing/FieldValue.cpp \ qpid/framing/FrameSet.cpp \ + qpid/framing/FrameDecoder.cpp \ qpid/framing/ProtocolInitiation.cpp \ qpid/framing/ProtocolVersion.cpp \ qpid/framing/SendContent.cpp \ @@ -493,7 +494,7 @@ nobase_include_HEADERS = \ qpid/framing/FieldTable.h \ qpid/framing/FieldValue.h \ qpid/framing/FrameDefaultVisitor.h \ - qpid/framing/FrameHandler.h \ + qpid/framing/FrameDecoder.h \ qpid/framing/FrameHandler.h \ qpid/framing/FrameSet.h \ qpid/framing/Handler.h \ diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index 15a8e9663d..6e0566c3c1 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -59,7 +59,7 @@ size_t Connection::decode(const char* buffer, size_t size) { bool Connection::canEncode() { if (!frameQueueClosed) connection->doOutput(); - Mutex::ScopedLock l(frameQueueLock); + Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index aea10949e4..f93203acbf 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -4,7 +4,7 @@ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. -n * You may obtain a copy of the License at + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -20,14 +20,17 @@ n * You may obtain a copy of the License at #include "Connection.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/SessionState.h" +#include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ClusterJoinedBody.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/ClusterUrlNoticeBody.h" +#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" +#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qpid/framing/AMQP_AllOperations.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/framing/Invoker.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -38,36 +41,17 @@ n * You may obtain a copy of the License at namespace qpid { namespace cluster { - using namespace qpid::framing; using namespace qpid::sys; using namespace std; -// Handle cluster controls from a given member. -struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler { +struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { Cluster& cluster; MemberId member; - - ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {} - - void joined(const std::string& url) { - cluster.joined(member, url); - } + ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} + void urlNotice(const std::string& u) { cluster.urlNotice (member, u); } + bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } }; - -ostream& operator <<(ostream& out, const Cluster& cluster) { - return out << cluster.name.str() << "-" << cluster.self; -} - -ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) { - return out << m.first << " at " << m.second; -} - -ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) { - ostream_iterator<Cluster::UrlMap::value_type> o(out, " "); - copy(urls.begin(), urls.end(), o); - return out; -} Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(&b), @@ -80,30 +64,39 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::dispatch, this, _1), // read 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect - ), - deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)), - mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2)) + ) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(trace, "Node " << self << " joining cluster: " << name_); + QPID_LOG(trace, "Joining cluster: " << name << " as " << self); cpg.join(name); - send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); + mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), + ConnectionId(self,0)); // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); - deliverQueue.start(poller); - mcastQueue.start(poller); } Cluster::~Cluster() {} +void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { + Mutex::ScopedLock l(lock); + connections.insert(ConnectionMap::value_type(ConnectionId(self, c.get()), c)); +} + +void Cluster::erase(ConnectionId id) { + Mutex::ScopedLock l(lock); + connections.erase(id); +} + void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. // Leave is called by from Broker destructor after the poller has // been shut down. No dispatches can occur. + + QPID_LOG(debug, "Leaving cluster " << name.str()); cpg.leave(name); - // broker is set to 0 when the final config-change is delivered. + // broker= is set to 0 when the final config-change is delivered. while(broker) { Mutex::ScopedUnlock u(lock); cpg.dispatchAll(); @@ -121,30 +114,30 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { buf.putLongLong(value); } -void Cluster::send(const AMQFrame& frame, const ConnectionId& id) { - QPID_LOG(trace, "MCAST [" << id << "] " << frame); - mcastQueue.push(Message(frame, id)); -} - -void Cluster::mcastQueueCb(const MessageQueue::iterator& begin, - const MessageQueue::iterator& end) -{ - // Static is OK because there is only one cluster allowed per - // process and only one thread in mcastQueueCb at a time. - static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. +void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) { + QPID_LOG(trace, "MCAST [" << connection << "] " << frame); + // FIXME aconway 2008-09-02: restore queueing. + Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. + static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder. Buffer buf(buffer, sizeof(buffer)); - for (MessageQueue::iterator i = begin; i != end; ++i) { - AMQFrame& frame =i->first; - ConnectionId id =i->second; - if (buf.available() < frame.size() + sizeof(uint64_t)) - break; - frame.encode(buf); - encodePtr(buf, id.second); - } + buf.putOctet(CONTROL); + encodePtr(buf, connection.getConnectionPtr()); + frame.encode(buf); iovec iov = { buffer, buf.getPosition() }; cpg.mcast(name, &iov, 1); } +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { + // FIXME aconway 2008-09-02: does this need locking? + Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. + char hdrbuf[1+sizeof(uint64_t)]; + Buffer buf(hdrbuf, sizeof(hdrbuf)); + buf.putOctet(DATA); + encodePtr(buf, id.getConnectionPtr()); + iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } }; + cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); +} + size_t Cluster::size() const { Mutex::ScopedLock l(lock); return urls.size(); @@ -153,19 +146,23 @@ size_t Cluster::size() const { std::vector<Url> Cluster::getUrls() const { Mutex::ScopedLock l(lock); std::vector<Url> result(urls.size()); - std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1)); + std::transform(urls.begin(), urls.end(), result.begin(), + boost::bind(&UrlMap::value_type::second, _1)); return result; } boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { - boost::intrusive_ptr<Connection> c = connections[id]; - if (!c && id.first != self) { // Shadow connection - std::ostringstream os; - os << id; - c = connections[id] = new Connection(*this, shadowOut, os.str(), id); + if (id.getMember() == self) + return boost::intrusive_ptr<Connection>(id.getConnectionPtr()); + ConnectionMap::iterator i = connections.find(id); + if (i == connections.end()) { // New shadow connection. + assert(id.getMember() != self); + std::ostringstream mgmtId; + mgmtId << name << ":" << id; + ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id)); + i = connections.insert(value).first; } - assert(c); - return c; + return i->second; } void Cluster::deliver( @@ -176,17 +173,28 @@ void Cluster::deliver( void* msg, int msg_len) { - MemberId from(nodeid, pid); try { + MemberId from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); - while (buf.available() > 0) { + Connection* connection; + uint8_t type = buf.getOctet(); + decodePtr(buf, connection); + if (connection == 0) { // Cluster controls AMQFrame frame; - if (!frame.decode(buf)) // Not enough data. - throw Exception("Received incomplete cluster event."); - Connection* cp; - decodePtr(buf, cp); - QPID_LOG(critical, "deliverQ.push " << frame); - deliverQueue.push(Message(frame, ConnectionId(from, cp))); + while (frame.decode(buf)) + if (!ClusterOperations(*this, from).invoke(frame)) + throw Exception("Invalid cluster control"); + } + else { // Connection data or control + boost::intrusive_ptr<Connection> c = + getConnection(ConnectionId(from, connection)); + if (type == DATA) + c->deliverBuffer(buf); + else { + AMQFrame frame; + while (frame.decode(buf)) + c->deliver(frame); + } } } catch (const std::exception& e) { @@ -197,59 +205,24 @@ void Cluster::deliver( } } -void Cluster::deliverQueueCb(const MessageQueue::iterator& begin, - const MessageQueue::iterator& end) -{ - for (MessageQueue::iterator i = begin; i != end; ++i) { - AMQFrame& frame(i->first); - ConnectionId connectionId(i->second); - try { - QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame); - if (!broker) { - QPID_LOG(error, "Unexpected DLVR after leaving the cluster."); - return; - } - if (connectionId.getConnectionPtr()) // Connection control - getConnection(connectionId)->deliver(frame); - else { // Cluster control - ClusterOperations cops(*this, connectionId.getMember()); - bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled(); - assert(invoked); - } - } - catch (const std::exception& e) { - // FIXME aconway 2008-01-30: exception handling. - QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what()); - assert(0); - throw; - } - } -} - -void Cluster::joined(const MemberId& member, const string& url) { - Mutex::ScopedLock l(lock); - QPID_LOG(debug, member << " has URL " << url); - urls[member] = url; - lock.notifyAll(); -} - void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, - cpg_address */*current*/, int /*nCurrent*/, + cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) + cpg_address */*joined*/, int /*nJoined*/) { - QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft)); + QPID_LOG(debug, "Cluster change: " + << std::make_pair(current, nCurrent) + << std::make_pair(left, nLeft)); + Mutex::ScopedLock l(lock); - // We add URLs to the map in joined() we don't keep track of pre-URL members yet. - for (int l = 0; l < nLeft; ++l) urls.erase(left[l]); + for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); + // Add new members when their URL notice arraives. - if (std::find(left, left+nLeft, self) != left+nLeft) { + if (std::find(left, left+nLeft, self) != left+nLeft) broker = 0; // We have left the group, this is the final config change. - QPID_LOG(debug, "Leaving cluster " << *this); - } - lock.notifyAll(); // Threads waiting for url changes. + lock.notifyAll(); // Threads waiting for membership changes. } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -263,14 +236,8 @@ void Cluster::disconnect(sys::DispatchHandle& h) { broker->shutdown(); } -void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { - Mutex::ScopedLock l(lock); - connections[c->getId()] = c; -} - -void Cluster::erase(ConnectionId id) { - Mutex::ScopedLock l(lock); - connections.erase(id); +void Cluster::urlNotice(const MemberId& m, const std::string& url) { + urls.insert(UrlMap::value_type(m,Url(url))); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 45bb3ed3c4..4963400e10 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -68,17 +68,18 @@ class Cluster : public RefCounted, private Cpg::Handler bool empty() const { return size() == 0; } /** Send frame to the cluster */ - void send(const framing::AMQFrame&, const ConnectionId&); + void mcastFrame(const framing::AMQFrame&, const ConnectionId&); + void mcastBuffer(const char*, size_t, const ConnectionId&); /** Leave the cluster */ void leave(); - void joined(const MemberId&, const std::string& url); + void urlNotice(const MemberId&, const std::string& url); broker::Broker& getBroker() { assert(broker); return *broker; } MemberId getSelf() const { return self; } - + private: typedef std::map<MemberId, Url> UrlMap; typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; @@ -88,6 +89,11 @@ class Cluster : public RefCounted, private Cpg::Handler typedef PollableQueue<Message> MessageQueue; boost::function<void()> shutdownNext; + + /** Handle a delivered frame */ + void deliverFrame(framing::AMQFrame&, const ConnectionId&); + + void deliverBuffer(const char*, size_t, const ConnectionId&); /** CPG deliver callback. */ void deliver( @@ -107,15 +113,6 @@ class Cluster : public RefCounted, private Cpg::Handler struct cpg_address */*joined*/, int /*nJoined*/ ); - /** Callback to handle delivered frames from the deliverQueue. */ - void deliverQueueCb(const MessageQueue::iterator& begin, - const MessageQueue::iterator& end); - - /** Callback to multi-cast frames from mcastQueue */ - void mcastQueueCb(const MessageQueue::iterator& begin, - const MessageQueue::iterator& end); - - /** Callback to dispatch CPG events. */ void dispatch(sys::DispatchHandle&); /** Callback if CPG fd is disconnected. */ @@ -136,8 +133,6 @@ class Cluster : public RefCounted, private Cpg::Handler ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - MessageQueue deliverQueue; - MessageQueue mcastQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 8faad9d6d5..506e982ffd 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -46,16 +46,23 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, Connection::~Connection() {} -// Forward all received frames to the cluster, continue handling on delivery. -void Connection::received(framing::AMQFrame& f) { - cluster.send(f, self); +void Connection::received(framing::AMQFrame& ) { + // FIXME aconway 2008-09-02: not called, codec sends straight to deliver + assert(0); } -// Don't doOutput in the -bool Connection::doOutput() { return output.doOutput(); } +bool Connection::doOutput() { return output.doOutput(); } + +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void Connection::deliverDoOutput(uint32_t requested) { + output.deliverDoOutput(requested); +} // Handle frames delivered from cluster. void Connection::deliver(framing::AMQFrame& f) { + QPID_LOG(trace, "DLVR [" << self << "]: " << f); // Handle connection controls, deliver other frames to connection. if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); @@ -71,7 +78,8 @@ void Connection::closed() { // handler will be deleted. // connection.setOutputHandler(&discardHandler); - cluster.send(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self); + cluster.mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self); + ++mcastSeq; } catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); @@ -83,11 +91,19 @@ void Connection::deliverClose () { cluster.erase(self); } -// Delivery of doOutput allows us to run the real connection doOutput() -// which stocks up the write buffers with data. -// -void Connection::deliverDoOutput(uint32_t requested) { - output.deliverDoOutput(requested); +size_t Connection::decode(const char* buffer, size_t size) { + QPID_LOG(trace, "mcastBuffer " << self << " " << mcastSeq << " " << size); + ++mcastSeq; + cluster.mcastBuffer(buffer, size, self); + // FIXME aconway 2008-09-01: deserialize? + return size; +} + +void Connection::deliverBuffer(Buffer& buf) { + QPID_LOG(trace, "deliverBuffer " << self << " " << deliverSeq << " " << buf.available()); + ++deliverSeq; + while (decoder.decode(buf)) + deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread. } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 6eac1453ac..e6372e80ea 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -31,6 +31,8 @@ #include "qpid/amqp_0_10/Connection.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/framing/FrameDecoder.h" +#include "qpid/framing/SequenceNumber.h" namespace qpid { @@ -56,16 +58,16 @@ class Connection : ~Connection(); ConnectionId getId() const { return self; } + broker::Connection& getBrokerConnection() { return connection; } bool isLocal() const { return self.second == this; } - // self-delivery of intercepted extension points. + Cluster& getCluster() { return cluster; } + + // self-delivery of multicast data. void deliver(framing::AMQFrame& f); void deliverClose(); void deliverDoOutput(uint32_t requested); - - void codecDeleted(); - - Cluster& getCluster() { return cluster; } + void deliverBuffer(framing::Buffer&); // ConnectionOutputHandler methods void close() {} @@ -78,13 +80,15 @@ class Connection : void closed(); bool doOutput(); bool hasOutput() { return connection.hasOutput(); } - void idleOut() { idleOut(); } - void idleIn() { idleIn(); } + void idleOut() { connection.idleOut(); } + void idleIn() { connection.idleIn(); } + + // ConnectionCodec methods + size_t decode(const char* buffer, size_t size); // ConnectionInputHandlerFactory sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient); - broker::Connection& getBrokerConnection() { return connection; } private: void sendDoOutput(); @@ -93,7 +97,10 @@ class Connection : NoOpConnectionOutputHandler discardHandler; WriteEstimate writeEstimate; OutputInterceptor output; + framing::FrameDecoder decoder; broker::Connection connection; + framing::SequenceNumber mcastSeq; + framing::SequenceNumber deliverSeq; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index cb396cd10c..f093a0cc1c 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -22,6 +22,7 @@ #include "Connection.h" #include "ProxyInputHandler.h" #include "qpid/broker/Connection.h" +#include "qpid/log/Statement.h" #include "qpid/memory.h" namespace qpid { @@ -54,7 +55,11 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, ConnectionCodec::~ConnectionCodec() {} // ConnectionCodec functions delegate to the codecOutput -size_t ConnectionCodec::decode(const char* buffer, size_t size) { return codec.decode(buffer, size); } +size_t ConnectionCodec::decode(const char* buffer, size_t size) { + return interceptor->decode(buffer, size); +} + +// FIXME aconway 2008-09-02: delegate to interceptor? size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); } bool ConnectionCodec::canEncode() { return codec.canEncode(); } void ConnectionCodec::closed() { codec.closed(); } diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index cbc3dcdfe6..59ce20d821 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -58,7 +58,7 @@ class ConnectionCodec : public sys::ConnectionCodec { ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c); ~ConnectionCodec(); - // ConnectionCodec functions delegate to the codecOutput + // ConnectionCodec functions. size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool canEncode(); diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index ce678015a2..754b4abd58 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -183,10 +183,6 @@ ostream& operator<<(ostream& o, const ConnectionId& c) { return o << c.first << "-" << c.second; } -ostream& operator<<(ostream& o, const cpg_name& name) { - return o << string(name.value, name.length); -} - }} // namespace qpid::cluster @@ -195,16 +191,18 @@ ostream& operator<<(ostream& o, const cpg_name& name) { std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) { const char* reasonString; switch (a.reason) { - case CPG_REASON_JOIN: reasonString = "joined"; break; - case CPG_REASON_LEAVE: reasonString = "left";break; - case CPG_REASON_NODEDOWN: reasonString = "node-down";break; - case CPG_REASON_NODEUP: reasonString = "node-up";break; - case CPG_REASON_PROCDOWN: reasonString = "process-down";break; - default: - assert(0); - reasonString = ""; + case CPG_REASON_JOIN: reasonString = " joined"; break; + case CPG_REASON_LEAVE: reasonString = " left";break; + case CPG_REASON_NODEDOWN: reasonString = " node-down";break; + case CPG_REASON_NODEUP: reasonString = " node-up";break; + case CPG_REASON_PROCDOWN: reasonString = " process-down";break; + default: reasonString = ""; } - return o << qpid::cluster::MemberId(a.nodeid, a.pid) << " " << reasonString; + return o << qpid::cluster::MemberId(a.nodeid, a.pid) << reasonString; +} + +std::ostream& operator<<(std::ostream& o, const cpg_name& name) { + return o << std::string(name.value, name.length); } namespace std { diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 6c77d2747a..ae021a9c4a 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -97,7 +97,7 @@ void OutputInterceptor::sendDoOutput() { // Send it anyway to keep the doOutput chain going until we are sure there's no more output // (in deliverDoOutput) // - parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>( + parent.getCluster().mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>( framing::ProtocolVersion(), request)), parent.getId()); QPID_LOG(trace, &parent << "Send doOutput request for " << request); } diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index 4646cd9174..8911896e1f 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -34,6 +34,9 @@ namespace cluster { class Connection; +/** Types of cluster messages. */ +enum EventType { DATA, CONTROL }; + /** first=node-id, second=pid */ struct MemberId : std::pair<uint32_t, uint32_t> { MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} @@ -51,6 +54,7 @@ struct ConnectionId : public std::pair<MemberId, Connection*> { MemberId getMember() const { return first; } Connection* getConnectionPtr() const { return second; } }; + std::ostream& operator<<(std::ostream&, const ConnectionId&); }} // namespace qpid::cluster diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index c1fc647b52..662b3574ff 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -45,6 +45,13 @@ uint32_t AMQFrame::frameOverhead() { return 12 /*frame header*/; } +uint16_t AMQFrame::DECODE_SIZE_MIN=4; + +uint16_t AMQFrame::decodeSize(char* data) { + Buffer buf(data+2, DECODE_SIZE_MIN); + return buf.getShort(); +} + void AMQFrame::encode(Buffer& buffer) const { //set track first (controls on track 0, everything else on 1): diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index 5a8e55f9d2..331a57e08d 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -104,7 +104,10 @@ class AMQFrame : public AMQDataBlock bool getEos() const { return eos; } void setEos(bool isEos) { eos = isEos; } + static uint16_t DECODE_SIZE_MIN; static uint32_t frameOverhead(); + /** Must point to at least DECODE_SIZE_MIN bytes of data */ + static uint16_t decodeSize(char* data); private: void init() { bof = eof = bos = eos = true; subchannel=0; channel=0; } diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h index a27b15cac0..4f482dc206 100644 --- a/cpp/src/qpid/framing/Buffer.h +++ b/cpp/src/qpid/framing/Buffer.h @@ -74,6 +74,7 @@ class Buffer uint32_t getSize() { return size; } uint32_t getPosition() { return position; } Iterator getIterator() { return Iterator(*this); } + char* getPointer() { return data; } void putOctet(uint8_t i); void putShort(uint16_t i); diff --git a/cpp/src/qpid/framing/FrameDecoder.cpp b/cpp/src/qpid/framing/FrameDecoder.cpp new file mode 100644 index 0000000000..07e2f7513d --- /dev/null +++ b/cpp/src/qpid/framing/FrameDecoder.cpp @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "FrameDecoder.h" +#include "Buffer.h" +#include "qpid/log/Statement.h" +#include <algorithm> + +namespace qpid { +namespace framing { + +namespace { +/** Move up to n bytes from start of buf to end of bytes. */ +void move(std::vector<char>& bytes, Buffer& buffer, size_t n) { + size_t oldSize = bytes.size(); + n = std::min(n, size_t(buffer.available())); + bytes.resize(oldSize+n); + char* p = &bytes[oldSize]; + buffer.getRawData(reinterpret_cast<uint8_t*>(p), n); +} +} + +bool FrameDecoder::decode(Buffer& buffer) { + if (buffer.available() == 0) return false; + if (fragment.empty()) { + if (frame.decode(buffer)) // Decode from buffer + return true; + else // Store fragment + move(fragment, buffer, buffer.available()); + } + else { // Already have a fragment + // Get enough data to decode the frame size. + if (fragment.size() < AMQFrame::DECODE_SIZE_MIN) { + move(fragment, buffer, AMQFrame::DECODE_SIZE_MIN - fragment.size()); + } + if (fragment.size() >= AMQFrame::DECODE_SIZE_MIN) { + uint16_t size = AMQFrame::decodeSize(&fragment[0]); + assert(size > fragment.size()); + move(fragment, buffer, size-fragment.size()); + Buffer b(&fragment[0], fragment.size()); + if (frame.decode(b)) { + assert(b.available() == 0); + fragment.clear(); + return true; + } + } + } + return false; +} + +}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/FrameDecoder.h b/cpp/src/qpid/framing/FrameDecoder.h new file mode 100644 index 0000000000..7f974dadc3 --- /dev/null +++ b/cpp/src/qpid/framing/FrameDecoder.h @@ -0,0 +1,44 @@ +#ifndef QPID_FRAMING_FRAMEDECODER_H +#define QPID_FRAMING_FRAMEDECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AMQFrame.h" + +namespace qpid { +namespace framing { + +/** + * Decode a frame from buffer. If buffer does not contain a complete + * frame, caches the fragment for the next call to decode. + */ +class FrameDecoder +{ + public: + bool decode(Buffer& buffer); + AMQFrame frame; + private: + std::vector<char> fragment; +}; +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_FRAMEDECODER_H*/ diff --git a/cpp/src/tests/.valgrind.supp b/cpp/src/tests/.valgrind.supp index dd8f7536a1..3b1fd2198a 100644 --- a/cpp/src/tests/.valgrind.supp +++ b/cpp/src/tests/.valgrind.supp @@ -193,9 +193,10 @@ } { - CPG related errors - seem benign but should invesgitate. + CPG error - seems benign. Memcheck:Param socketcall.sendmsg(msg.msg_iov[i]) - fun:sendmsg - obj:/usr/lib/openais/libcpg.so.2.0.0 + obj:* + obj:*/libcpg.so.2.0.0 } + diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h index 07e69a0735..cc8714bf23 100644 --- a/cpp/src/tests/ForkedBroker.h +++ b/cpp/src/tests/ForkedBroker.h @@ -85,7 +85,10 @@ class ForkedBroker { ::close(pipeFds[1]); FILE* f = ::fdopen(pipeFds[0], "r"); if (!f) throw ErrnoException("fopen failed"); - if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("ill-formatted port"); + if (::fscanf(f, "%d", &port) != 1) { + if (ferror(f)) throw ErrnoException("Error reading port number from child."); + else throw qpid::Exception("EOF reading port number from child."); + } } else { // child ::close(pipeFds[0]); diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 3f09143fff..9abc1b189e 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -27,6 +27,7 @@ #include "qpid/client/Connection.h" #include "qpid/client/Session.h" #include "qpid/framing/Uuid.h" +#include "qpid/log/Logger.h" #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> @@ -87,7 +88,7 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { void ClusterFixture::add() { std::ostringstream os; - os << "broker" << size(); + os << "fork" << size(); std::string prefix = os.str(); const char* argv[] = { @@ -105,6 +106,7 @@ void ClusterFixture::add() { } else { // First broker, run in this process. Broker::Options opts; + qpid::log::Logger::instance().setPrefix("main"); Plugin::addOptions(opts); // Pick up cluster options. opts.parse(argc, argv, "", true); // Allow-unknown for --load-module broker0.reset(new BrokerFixture(opts)); @@ -144,7 +146,8 @@ QPID_AUTO_TEST_CASE(testSingletonCluster) { ClusterFixture cluster(1); Client c(cluster[0]); BOOST_CHECK(c.session.queueQuery("q").getQueue().empty()); - BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty()); + BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty()); + // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate. } QPID_AUTO_TEST_CASE(testWiringReplication) { |