diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/cluster.mk | 28 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 111 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.h | 89 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameDefaultVisitor.h | 10 |
9 files changed, 29 insertions, 249 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 49fe5c2a81..0c3e4af20f 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -3,24 +3,22 @@ # lib_LTLIBRARIES += libqpidcluster.la -# if CLUSTER +if CLUSTER -# libqpidcluster_la_SOURCES = \ -# qpid/cluster/Cluster.cpp \ -# qpid/cluster/Cluster.h \ -# qpid/cluster/Cpg.cpp \ -# qpid/cluster/Cpg.h \ -# qpid/cluster/Dispatchable.h \ -# qpid/cluster/ClusterPlugin.cpp \ -# qpid/cluster/ClassifierHandler.h \ -# qpid/cluster/ClassifierHandler.cpp \ -# qpid/cluster/SessionManager.h \ -# qpid/cluster/SessionManager.cpp +libqpidcluster_la_SOURCES = \ + qpid/cluster/Cluster.cpp \ + qpid/cluster/Cluster.h \ + qpid/cluster/Cpg.cpp \ + qpid/cluster/Cpg.h \ + qpid/cluster/Dispatchable.h \ + qpid/cluster/ClusterPlugin.cpp \ + qpid/cluster/ClassifierHandler.h \ + qpid/cluster/ClassifierHandler.cpp -# libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la +libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la -# else +else # Empty stub library to satisfy rpm spec file. libqpidcluster_la_SOURCES = -#endif +endif diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 3c73719ef9..ce87d23c0d 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -21,6 +21,7 @@ #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> +#include <boost/scoped_array.hpp> #include <algorithm> #include <iterator> #include <map> @@ -46,13 +47,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker& broker) : - FrameHandler(&sessions), +Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker&) : + FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer cpg(*this), name(name_), - url(url_), - self(Id::self(cpg)), - sessions(broker, *this) + url(url_) { QPID_LOG(trace, *this << " Joining cluster: " << name_); cpg.join(name); @@ -80,10 +79,10 @@ Cluster::~Cluster() { void Cluster::handle(AMQFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); - Buffer buf(frame.size()); + boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling. + Buffer buf(store.get()); frame.encode(buf); - buf.flip(); - iovec iov = { buf.start(), frame.size() }; + iovec iov = { store.get(), frame.size() }; cpg.mcast(name, &iov, 1); } @@ -144,6 +143,8 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { { Mutex::ScopedLock l(lock); members[from].url=notifyIn->getUrl(); + if (!self.id && notifyIn->getUrl() == url) + self=from; lock.notifyAll(); QPID_LOG(trace, *this << ": members joined: " << members); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 1c53bcb9a7..5aca3faf44 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -19,9 +19,10 @@ * */ -#include "SessionManager.h" #include "Cpg.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" @@ -119,7 +120,6 @@ class Cluster : public framing::FrameHandler, MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - SessionManager sessions; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 35a07fbc2d..e24c60dc2f 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -17,7 +17,6 @@ */ #include "qpid/broker/Broker.h" #include "qpid/cluster/Cluster.h" -#include "qpid/cluster/SessionManager.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/shared_ptr.h" @@ -43,7 +42,6 @@ struct ClusterPlugin : public Plugin { ClusterOptions options; boost::optional<Cluster> cluster; - boost::optional<SessionManager> sessions; Options* getOptions() { return &options; } diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 506703d105..01d97d2a17 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -145,13 +145,6 @@ std::string Cpg::cantMcastMsg(const Name& group) { return "Cannot mcast to CPG group "+group.str(); } -uint32_t Cpg::getLocalNoideId() const { - unsigned int nodeid; - check(cpg_local_get(handle, &nodeid), "Cannot get local node ID"); - assert(nodeid <= std::numeric_limits<uint32_t>::max()); - return nodeid; -} - ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { ostream_iterator<Cpg::Id> i(o, " "); std::copy(a.first, a.first+a.second, i); @@ -177,10 +170,6 @@ ostream& operator <<(ostream& out, const cpg_name& name) { } -Cpg::Id Cpg::Id::self(Cpg& cpg) { - return Id(cpg.getLocalNoideId(), getpid()); -} - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index 92e2e591b8..09b6996cc0 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -57,12 +57,10 @@ class Cpg : public Dispatchable { struct Id { uint64_t id; - Id() : id(0) {} + Id(uint64_t n=0) : id(n) {} Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; } Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {} - static Id self(Cpg& cpg); - operator uint64_t() const { return id; } uint32_t nodeId() const { return id >> 32; } pid_t pid() const { return id & 0xFFFF; } @@ -132,8 +130,6 @@ class Cpg : public Dispatchable { cpg_handle_t getHandle() const { return handle; } - uint32_t getLocalNoideId() const; - private: class Handles; struct ClearHandleOnExit; diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp deleted file mode 100644 index 68e0223a40..0000000000 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "SessionManager.h" -#include "ClassifierHandler.h" - -#include "qpid/log/Statement.h" -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/broker/BrokerAdapter.h" -#include "qpid/broker/Connection.h" - -#include <boost/utility/in_place_factory.hpp> - -namespace qpid { -namespace cluster { - -using namespace framing; -using namespace sys; -using namespace broker; - -/** Handler to send frames direct to local broker (bypass correlation etc.) */ -struct SessionManager::BrokerHandler : public FrameHandler -{ - Connection connection; - SessionHandler sessionAdapter; - broker::Session session; - BrokerAdapter adapter; - - // TODO aconway 2007-07-23: Lots of needless flab here (Channel, - // Connection, ChannelAdapter) As these classes are untangled the - // flab can be reduced. The real requirements are: - // - Dispatch methods direct to broker bypassing all the correlation muck - // - Efficiently suppress responses - // For the latter we are now using a ChannelAdapter with noop send() - // A more efficient solution would be a no-op proxy. - // - BrokerHandler(Broker& broker) : - connection(0, broker), - sessionAdapter(connection, 0), - session(sessionAdapter, 1), - adapter(session, 0) {} // FIXME aconway 2008-01-29: - - void handle(AMQFrame& frame) { - AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.getBody()); - assert(body); - body->invoke(adapter); - } - - // Dummy methods. - virtual void handleHeader(AMQHeaderBody*){} - virtual void handleContent(AMQContentBody*){} - virtual void handleHeartbeat(AMQHeartbeatBody*){} - virtual bool isOpen() const{ return true; } - virtual void handleMethod(AMQMethodBody*){} - // No-op send. - virtual void send(const AMQBody&) {} - - //delivery adapter methods, also no-ops: - virtual DeliveryId deliver(intrusive_ptr<Message>&, DeliveryToken::shared_ptr) { return 0; } - virtual void redeliver(intrusive_ptr<Message>&, DeliveryToken::shared_ptr, DeliveryId) {} -}; - -SessionManager::~SessionManager(){} - -SessionManager::SessionManager(Broker& b, FrameHandler& c) - : cluster(c), localBroker(new BrokerHandler(b)) {} - -void SessionManager::update(ChannelId channel, FrameHandler::Chains& chains) { - Mutex::ScopedLock l(lock); - // Create a new local session, store local chains. - assert(!sessions[channel]); - boost::optional<Session>& session=sessions[channel]; - session = boost::in_place(boost::ref(cluster), boost::ref(chains.in)); - chains.in = &session->classifier; -} - -void SessionManager::handle(AMQFrame& frame) { - // Incoming from cluster. - { - Mutex::ScopedLock l(lock); - SessionMap::iterator i=sessions.find(frame.getChannel()); - if (i == sessions.end()) { - // Non-local wiring method frame, invoke locally. - (*localBroker)(frame); - } - else { - // Local frame continuing on local chain - assert(i->second); - i->second->cont(frame); - } - } -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h deleted file mode 100644 index c0e0cb5736..0000000000 --- a/cpp/src/qpid/cluster/SessionManager.h +++ /dev/null @@ -1,89 +0,0 @@ -#ifndef QPID_CLUSTER_SESSIONMANAGER_H -#define QPID_CLUSTER_SESSIONMANAGER_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ClassifierHandler.h" - -//FIXME aconway 2008-01-29: #include "qpid/framing/HandlerUpdater.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/sys/Mutex.h" - -#include <boost/noncopyable.hpp> -#include <boost/scoped_ptr.hpp> -#include <boost/optional.hpp> - -#include <map> - -namespace qpid { - -namespace broker { -class Broker; -} - -namespace framing { -class Uuid; -} - -namespace cluster { - -/** - * Manage the clusters session map. - * // FIXME aconway 2008-01-29: HandlerUpdater - */ -class SessionManager : public framing::FrameHandler, private boost::noncopyable -{ - public: - SessionManager(broker::Broker& broker, framing::FrameHandler& cluster); - ~SessionManager(); - - /** ChannelUpdater: add cluster handlers to session. */ - void update(framing::ChannelId, framing::FrameHandler::Chains&); - - /** FrameHandler: map frames from the cluster to sessions. */ - void handle(framing::AMQFrame&); - - /** Get ChannelID for UUID. Return 0 if no mapping */ - framing::ChannelId getChannelId(const framing::Uuid&) const; - - private: - class SessionOperations; - class BrokerHandler; - - struct Session { - Session(framing::FrameHandler& cluster, framing::FrameHandler& cont_) - : cont(cont_), classifier(cluster,cont_) {} - framing::FrameHandler& cont; // Continue local dispatch - ClassifierHandler classifier; - }; - - typedef std::map<framing::ChannelId,boost::optional<Session> > SessionMap; - - sys::Mutex lock; - framing::FrameHandler& cluster; - boost::scoped_ptr<BrokerHandler> localBroker; - SessionMap sessions; -}; - - -}} // namespace qpid::cluster - - - -#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/ diff --git a/cpp/src/qpid/framing/FrameDefaultVisitor.h b/cpp/src/qpid/framing/FrameDefaultVisitor.h index f695414977..07e1d6d997 100644 --- a/cpp/src/qpid/framing/FrameDefaultVisitor.h +++ b/cpp/src/qpid/framing/FrameDefaultVisitor.h @@ -24,14 +24,12 @@ #include "qpid/framing/MethodBodyDefaultVisitor.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" namespace qpid { namespace framing { - -class AMQHeaderBody; -class AMQContentBody; -class AMQHeartbeatBody; - /** * Visitor for all concrete frame body types, combines * AMQBodyConstVisitor and MethodBodyDefaultVisitor. @@ -45,12 +43,12 @@ struct FrameDefaultVisitor : public AMQBodyConstVisitor, protected MethodBodyDefaultVisitor { virtual void defaultVisit(const AMQBody&) = 0; + void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); } void visit(const AMQHeaderBody& b) { defaultVisit(b); } void visit(const AMQContentBody& b) { defaultVisit(b); } void visit(const AMQHeartbeatBody& b) { defaultVisit(b); } void visit(const AMQMethodBody& b) { b.accept(static_cast<MethodBodyDefaultVisitor&>(*this)); } - void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); } using AMQBodyConstVisitor::visit; using MethodBodyDefaultVisitor::visit; |