diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/cluster.mk | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 54 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionFrame.cpp | 51 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionFrame.h | 71 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Buffer.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Uuid.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Uuid.h | 26 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.cpp | 22 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.h | 21 | ||||
-rw-r--r-- | cpp/src/tests/Cluster_child.cpp | 13 | ||||
-rw-r--r-- | cpp/src/tests/Uuid.cpp | 6 |
14 files changed, 217 insertions, 85 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 173592dc38..bfcc4fd850 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -13,7 +13,9 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/Dispatchable.h \ qpid/cluster/ClusterPluginProvider.cpp \ qpid/cluster/ClassifierHandler.h \ - qpid/cluster/ClassifierHandler.cpp + qpid/cluster/ClassifierHandler.cpp \ + qpid/cluster/SessionFrame.h \ + qpid/cluster/SessionFrame.cpp libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e691ad357d..f2d1b75f3f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,6 +19,7 @@ #include "Cluster.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> @@ -45,24 +46,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -namespace { - -/** We mark the high bit of a frame's channel number to know if it's - * an incoming or outgoing frame when frames arrive via multicast. - */ -bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; } -bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); } -void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; } -void markIncoming(AMQFrame&) { /*noop*/ } -void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; } - -} - struct Cluster::IncomingHandler : public FrameHandler { IncomingHandler(Cluster& c) : cluster(c) {} void handle(AMQFrame& frame) { - markIncoming(frame); - cluster.mcast(frame); + SessionFrame sf(Uuid(true), frame, SessionFrame::IN); + cluster.mcast(sf); } Cluster& cluster; }; @@ -70,18 +58,18 @@ struct Cluster::IncomingHandler : public FrameHandler { struct Cluster::OutgoingHandler : public FrameHandler { OutgoingHandler(Cluster& c) : cluster(c) {} void handle(AMQFrame& frame) { - markOutgoing(frame); - cluster.mcast(frame); + SessionFrame sf(Uuid(true), frame, SessionFrame::OUT); + cluster.mcast(sf); } Cluster& cluster; }; - // TODO aconway 2007-06-28: Right now everything is backed up via // multicast. When we have point-to-point backups the // Incoming/Outgoing handlers must determine where each frame should // be sent: to multicast or only to specific backup(s) via AMQP. + Cluster::Cluster(const std::string& name_, const std::string& url_) : cpg(new Cpg(*this)), name(name_), @@ -114,7 +102,7 @@ Cluster::~Cluster() { } } -void Cluster::mcast(AMQFrame& frame) { +void Cluster::mcast(SessionFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -124,11 +112,9 @@ void Cluster::mcast(AMQFrame& frame) { } void Cluster::notify() { - // TODO aconway 2007-06-25: Use proxy here. - ProtocolVersion version; - AMQFrame frame(version, 0, - make_shared_ptr(new ClusterNotifyBody(version, url))); - mcast(frame); + SessionFrame sf; + sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); + mcast(sf); } size_t Cluster::size() const { @@ -136,12 +122,13 @@ size_t Cluster::size() const { return members.size(); } -void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) { +void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) { Mutex::ScopedLock l(lock); - fromChains = chains; + receivedChain = chain; } Cluster::MemberList Cluster::getMembers() const { + // TODO aconway 2007-07-04: use read/write lock? Mutex::ScopedLock l(lock); MemberList result(members.size()); std::transform(members.begin(), members.end(), result.begin(), @@ -159,15 +146,13 @@ void Cluster::deliver( { Id from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; + SessionFrame frame; frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (!handleClusterFrame(from, frame)) { - FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out; - unMark(frame); - if (chain) - chain->handle(frame); - } + if (frame.uuid.isNull()) + handleClusterFrame(from, frame.frame); + else + receivedChain->handle(frame); } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, @@ -179,7 +164,8 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, ; return (predicate(*this)); } - + +// Handle cluster control frame from the null session. bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 199a93a7c5..6ab4cb58df 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -20,6 +20,7 @@ */ #include "qpid/cluster/Cpg.h" +#include "qpid/cluster/SessionFrame.h" #include "qpid/framing/FrameHandler.h" #include "qpid/shared_ptr.h" #include "qpid/sys/Monitor.h" @@ -69,13 +70,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler bool empty() const { return size() == 0; } - /** Get handler chains to send frames to the cluster */ - framing::FrameHandler::Chains getToChains() { + /** Get handler chains to send incoming/outgoing frames to the cluster */ + framing::FrameHandler::Chains getSendChains() { return toChains; } - /** Set handler chains for frames received from the cluster */ - void setFromChains(const framing::FrameHandler::Chains& chains); + /** Set handler for frames received from the cluster */ + void setReceivedChain(const SessionFrameHandler::Chain& chain); /** Wait for predicate(*this) to be true, up to timeout. *@return True if predicate became true, false if timed out. @@ -91,7 +92,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler typedef std::map< framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; - void mcast(framing::AMQFrame&); ///< send frame by multicast. + void mcast(SessionFrame&); ///< send frame by multicast. void notify(); ///< Notify cluster of my details. void deliver( @@ -123,7 +124,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler sys::Thread dispatcher; boost::function<void()> callback; framing::FrameHandler::Chains toChains; - framing::FrameHandler::Chains fromChains; + SessionFrameHandler::Chain receivedChain; struct IncomingHandler; struct OutgoingHandler; diff --git a/cpp/src/qpid/cluster/SessionFrame.cpp b/cpp/src/qpid/cluster/SessionFrame.cpp new file mode 100644 index 0000000000..1a20a5eddc --- /dev/null +++ b/cpp/src/qpid/cluster/SessionFrame.cpp @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionFrame.h" + +#include "qpid/QpidError.h" + +namespace qpid { +namespace cluster { + +void SessionFrame::encode(framing::Buffer& buf) { + uuid.encode(buf); + frame.encode(buf); + buf.putOctet(isIncoming); +} + +void SessionFrame::decode(framing::Buffer& buf) { + uuid.decode(buf); + if (!frame.decode(buf)) + THROW_QPID_ERROR(FRAMING_ERROR, "Incomplete frame"); + isIncoming = buf.getOctet(); +} + +size_t SessionFrame::size() const { + return uuid.size() + frame.size() + 1 /*isIncoming*/; +} + +std::ostream& operator<<(std::ostream& out, const SessionFrame& frame) { + return out << "[session=" << frame.uuid + << (frame.isIncoming ? ",in: ":",out: ") + << frame.frame << "]"; +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionFrame.h b/cpp/src/qpid/cluster/SessionFrame.h new file mode 100644 index 0000000000..12885da7e1 --- /dev/null +++ b/cpp/src/qpid/cluster/SessionFrame.h @@ -0,0 +1,71 @@ +#ifndef QPID_CLUSTER_SESSIONFRAME_H +#define QPID_CLUSTER_SESSIONFRAME_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Handler.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Uuid.h" + +#include <ostream> + +namespace qpid { + +namespace framing { +class AMQFrame; +class Buffer; +} + +namespace cluster { + +/** + * An AMQFrame with a UUID and direction. + */ +struct SessionFrame +{ + SessionFrame() : isIncoming(false) {} + + SessionFrame(const framing::Uuid& id, const framing::AMQFrame& f, bool incoming) + : uuid(id), frame(f), isIncoming(incoming) {} + + void encode(framing::Buffer&); + + void decode(framing::Buffer&); + + size_t size() const; + + static const bool IN = true; + static const bool OUT = false; + + framing::Uuid uuid; + framing::AMQFrame frame; + bool isIncoming; +}; + +typedef framing::Handler<SessionFrame&> SessionFrameHandler; + +std::ostream& operator<<(std::ostream&, const SessionFrame&); + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_SESSIONFRAME_H*/ diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index a528913fd9..13f1d3cece 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -47,10 +47,6 @@ AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::s AMQFrame::~AMQFrame() {} -AMQBody::shared_ptr AMQFrame::getBody(){ - return body; -} - void AMQFrame::encode(Buffer& buffer) { buffer.putOctet(body->type()); diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index 1a7b203ad7..16c1427802 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -32,7 +32,8 @@ #include "AMQHeartbeatBody.h" #include "qpid/framing/AMQP_MethodVersionMap.h" #include "qpid/framing/AMQP_HighestVersion.h" -#include "Buffer.h" +#include "qpid/framing/Buffer.h" +#include "qpid/shared_ptr.h" namespace qpid { namespace framing { @@ -49,7 +50,9 @@ class AMQFrame : public AMQDataBlock virtual bool decode(Buffer& buffer); virtual uint32_t size() const; uint16_t getChannel() const { return channel; } - AMQBody::shared_ptr getBody(); + + shared_ptr<AMQBody> getBody() { return body; } + void setBody(const shared_ptr<AMQBody>& b) { body = b; } /** Convenience template to cast the body to an expected type */ template <class T> boost::shared_ptr<T> castBody() { diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h index d35935ad19..04acb65e91 100644 --- a/cpp/src/qpid/framing/Buffer.h +++ b/cpp/src/qpid/framing/Buffer.h @@ -81,6 +81,8 @@ public: void putRawData(const uint8_t* data, size_t size); void getRawData(uint8_t* data, size_t size); + template <class T> void put(const T& data) { data.encode(*this); } + template <class T> void get(T& data) { data.decode(*this); } }; }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp index b1523f0d61..3a83430d56 100644 --- a/cpp/src/qpid/framing/Uuid.cpp +++ b/cpp/src/qpid/framing/Uuid.cpp @@ -21,20 +21,14 @@ #include "qpid/QpidError.h" #include "qpid/framing/Buffer.h" -#include <uuid/uuid.h> - namespace qpid { namespace framing { using namespace std; -Uuid::Uuid() { uuid_generate(c_array()); } - -Uuid::Uuid(uint8_t* uu) { uuid_copy(c_array(),uu); } - static const size_t UNPARSED_SIZE=36; -void Uuid::encode(Buffer& buf) { +void Uuid::encode(Buffer& buf) const { buf.putRawData(data(), size()); } diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h index a2f415b118..19ae79db6a 100644 --- a/cpp/src/qpid/framing/Uuid.h +++ b/cpp/src/qpid/framing/Uuid.h @@ -20,9 +20,12 @@ */ #include <boost/array.hpp> + #include <ostream> #include <istream> +#include <uuid/uuid.h> + namespace qpid { namespace framing { @@ -35,16 +38,29 @@ class Buffer; * boost::array so Uuid can be the key type in a map etc. */ struct Uuid : public boost::array<uint8_t, 16> { - /** Geneate universally unique identifier */ - Uuid(); + /** If unique is true, generate a unique ID else a null ID. */ + Uuid(bool unique=false) { if (unique) generate(); else clear(); } + + /** Copy from 16 bytes of data */ + Uuid(const uint8_t* data) { assign(data); } + + /** Copy from 16 bytes of data */ + void assign(const uint8_t* data) { uuid_copy(c_array(), data); } + + /** Set to a new unique identifier */ + void generate() { uuid_generate(c_array()); } - /** Initialize from 16 bytes of data */ - Uuid(uint8_t* data); + /** Set to all zeros */ + void clear() { uuid_clear(c_array()); } + + /** Test for null (all zeros) */ + bool isNull() const { return uuid_is_null(data()); } // Default op= and copy ctor are fine. // boost::array gives us ==, < etc. - void encode(framing::Buffer& buf); + void encode(framing::Buffer& buf) const; + void decode(framing::Buffer& buf); }; diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp index ccc1b52249..2e6d9ecfff 100644 --- a/cpp/src/tests/Cluster.cpp +++ b/cpp/src/tests/Cluster.cpp @@ -36,10 +36,13 @@ using namespace qpid::log; BOOST_AUTO_TEST_CASE(testClusterOne) { TestCluster cluster("clusterOne", "amqp:one:1"); AMQFrame frame(VER, 1, new ChannelPingBody(VER)); - cluster.getToChains().in->handle(frame); - BOOST_REQUIRE(cluster.in.waitFor(1)); + cluster.getSendChains().in->handle(frame); + BOOST_REQUIRE(cluster.received.waitFor(1)); - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody()); + SessionFrame& sf=cluster.received[0]; + BOOST_CHECK(sf.isIncoming); + BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody()); + BOOST_CHECK_EQUAL(1u, cluster.size()); Cluster::MemberList members = cluster.getMembers(); BOOST_CHECK_EQUAL(1u, members.size()); @@ -57,11 +60,13 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) { // Exchange frames with child. AMQFrame frame(VER, 1, new ChannelPingBody(VER)); - cluster.getToChains().in->handle(frame); - BOOST_REQUIRE(cluster.in.waitFor(1)); - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody()); - BOOST_REQUIRE(cluster.out.waitFor(1)); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody()); + cluster.getSendChains().in->handle(frame); + BOOST_REQUIRE(cluster.received.waitFor(1)); + SessionFrame& sf=cluster.received[0]; + BOOST_CHECK(sf.isIncoming); + BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody()); + BOOST_REQUIRE(cluster.received.waitFor(2)); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody()); // Wait for child to exit. int status; @@ -99,3 +104,4 @@ BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) { BOOST_CHECK_EQUAL(1u, other->count); } + diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h index f37c87a9ad..8fddd1d1f7 100644 --- a/cpp/src/tests/Cluster.h +++ b/cpp/src/tests/Cluster.h @@ -44,10 +44,10 @@ using namespace boost; void null_deleter(void*) {} -struct TestFrameHandler : - public FrameHandler, public vector<AMQFrame>, public Monitor +template <class T> +struct TestHandler : public Handler<T&>, public vector<T>, public Monitor { - void handle(AMQFrame& frame) { + void handle(T& frame) { Mutex::ScopedLock l(*this); push_back(frame); notifyAll(); @@ -56,23 +56,22 @@ struct TestFrameHandler : bool waitFor(size_t n) { Mutex::ScopedLock l(*this); AbsTime deadline(now(), 5*TIME_SEC); - while (size() != n && wait(deadline)) + while (vector<T>::size() != n && wait(deadline)) ; - return size() == n; + return vector<T>::size() == n; } }; +typedef TestHandler<AMQFrame> TestFrameHandler; +typedef TestHandler<SessionFrame> TestSessionFrameHandler; + void nullDeleter(void*) {} struct TestCluster : public Cluster { TestCluster(string name, string url) : Cluster(name, url) { - setFromChains( - FrameHandler::Chains( - make_shared_ptr(&in, nullDeleter), - make_shared_ptr(&out, nullDeleter) - )); + setReceivedChain(make_shared_ptr(&received, nullDeleter)); } /** Wait for cluster to be of size n. */ @@ -80,7 +79,7 @@ struct TestCluster : public Cluster return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n)); } - TestFrameHandler in, out; + TestSessionFrameHandler received; }; diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp index d73d2bdbc7..216afc7bca 100644 --- a/cpp/src/tests/Cluster_child.cpp +++ b/cpp/src/tests/Cluster_child.cpp @@ -33,13 +33,16 @@ static const ProtocolVersion VER; /** Chlid part of Cluster::clusterTwo test */ void clusterTwo() { TestCluster cluster("clusterTwo", "amqp::2"); - BOOST_REQUIRE(cluster.in.waitFor(1)); // Frame from parent. - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody()); + BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent. + BOOST_CHECK(cluster.received[0].isIncoming); + BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.received[0].frame.getBody()); BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent + AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - cluster.getToChains().out->handle(frame); - BOOST_REQUIRE(cluster.out.waitFor(1)); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody()); + cluster.getSendChains().out->handle(frame); + BOOST_REQUIRE(cluster.received.waitFor(2)); + BOOST_CHECK(!cluster.received[1].isIncoming); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody()); } int test_main(int, char**) { diff --git a/cpp/src/tests/Uuid.cpp b/cpp/src/tests/Uuid.cpp index 43d1cbcbba..da8c94aeae 100644 --- a/cpp/src/tests/Uuid.cpp +++ b/cpp/src/tests/Uuid.cpp @@ -37,6 +37,7 @@ struct UniqueSet : public std::set<Uuid> { BOOST_AUTO_TEST_CASE(testUuidCtor) { // Uniqueness boost::array<Uuid,1000> uuids; + for_each(uuids.begin(), uuids.end(), mem_fun_ref(&Uuid::generate)); UniqueSet unique; for_each(uuids.begin(), uuids.end(), unique); } @@ -62,10 +63,11 @@ BOOST_AUTO_TEST_CASE(testUuidOstream) { BOOST_AUTO_TEST_CASE(testUuidEncodeDecode) { Buffer buf(Uuid::size()); - Uuid uuid; + Uuid uuid(sample.c_array()); uuid.encode(buf); buf.flip(); Uuid decoded; decoded.decode(buf); - BOOST_CHECK(uuid==decoded); + BOOST_CHECK_EQUAL(string(sample.begin(), sample.end()), + string(decoded.begin(), decoded.end())); } |