diff options
author | Alan Conway <aconway@apache.org> | 2007-07-27 22:08:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-07-27 22:08:51 +0000 |
commit | 956a72f00b64928a601ea2891789a53271fc7571 (patch) | |
tree | c4469485822787d4742b06d35e23df007f399ed9 | |
parent | ac669123004b6e78468cc4fcea3ffb4b9d7b7bd3 (diff) | |
download | qpid-python-956a72f00b64928a601ea2891789a53271fc7571.tar.gz |
* src/tests/ais_check, cluster.mk: Run AIS tests only if:
- CLUSTER makefile conditional set by configure.
- Effective gid == ais
- aisexec is running
Otherwise print a warning.
* src/tests/EventChannelConnectionTest.cpp
* src/qpid/cluster/doxygen_overview.h
Removed unused files.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560404 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionFrame.cpp | 51 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionFrame.h | 71 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 55 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/framing/HandlerUpdater.h | 4 |
11 files changed, 46 insertions, 195 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index dcc70ca897..39df373d2e 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -14,8 +14,6 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/ClusterPlugin.cpp \ qpid/cluster/ClassifierHandler.h \ qpid/cluster/ClassifierHandler.cpp \ - qpid/cluster/SessionFrame.h \ - qpid/cluster/SessionFrame.cpp \ qpid/cluster/SessionManager.h \ qpid/cluster/SessionManager.cpp diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 26ec55ac44..fa183979e1 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -47,6 +47,7 @@ using qpid::sys::Acceptor; using qpid::framing::HandlerUpdater; using qpid::framing::FrameHandler; +using qpid::framing::ChannelId; namespace qpid { namespace broker { @@ -162,9 +163,10 @@ void Broker::add(const shared_ptr<HandlerUpdater>& updater) { handlerUpdaters.push_back(updater); } -void Broker::update(FrameHandler::Chains& chains) { +void Broker::update(ChannelId channel, FrameHandler::Chains& chains) { for_each(handlerUpdaters.begin(), handlerUpdaters.end(), - boost::bind(&HandlerUpdater::update, _1, boost::ref(chains))); + boost::bind(&HandlerUpdater::update, _1, + channel, boost::ref(chains))); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 9f57a45e0c..1ccc3564f5 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -33,7 +33,6 @@ #include "qpid/Plugin.h" #include "qpid/Url.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/HandlerUpdater.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Acceptor.h" @@ -96,7 +95,7 @@ class Broker : public sys::Runnable, public Plugin::Target void add(const shared_ptr<framing::HandlerUpdater>&); /** Apply all handler updaters to a handler chain pair. */ - void update(framing::FrameHandler::Chains&); + void update(framing::ChannelId, framing::FrameHandler::Chains&); MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 5b22167323..6bd846e2a3 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -108,7 +108,7 @@ FrameHandler::Chains& Connection::getChannel(ChannelId id) { FrameHandler::Chains chains( new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out)); - broker.update(chains); + broker.update(id, chains); i = channels.insert(ChannelMap::value_type(id, chains)).first; } return i->second; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index b59bfe878d..52d8691f33 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,7 +19,6 @@ #include "Cluster.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" -#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> @@ -52,9 +51,9 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { Cluster::Cluster( const std::string& name_, const std::string& url_, - const SessionFrameHandler::Chain& next + const FrameHandler::Chain& next ) : - SessionFrameHandler(next), + FrameHandler(next), cpg(new Cpg(*this)), name(name_), url(url_), @@ -85,7 +84,7 @@ Cluster::~Cluster() { } } -void Cluster::handle(SessionFrame& frame) { +void Cluster::handle(AMQFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -95,9 +94,9 @@ void Cluster::handle(SessionFrame& frame) { } void Cluster::notify() { - SessionFrame sf; - sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); - handle(sf); + AMQFrame frame(ProtocolVersion(), 0, + new ClusterNotifyBody(ProtocolVersion(), url)); + handle(frame); } size_t Cluster::size() const { @@ -125,11 +124,11 @@ void Cluster::deliver( assert(name == *group); Id from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); - SessionFrame frame; + AMQFrame frame; frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (frame.uuid.isNull()) - handleClusterFrame(from, frame.frame); + if (frame.getChannel() == 0) + handleClusterFrame(from, frame); else next->handle(frame); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index f6afe14c62..b9cd3b73d1 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -20,7 +20,6 @@ */ #include "qpid/cluster/Cpg.h" -#include "qpid/cluster/SessionFrame.h" #include "qpid/framing/FrameHandler.h" #include "qpid/shared_ptr.h" #include "qpid/sys/Monitor.h" @@ -40,13 +39,13 @@ namespace qpid { namespace cluster { * Connection to the cluster. Maintains cluster membership * data. * - * As SessionFrameHandler, handles frames by sending them to the - * cluster, sends frames received from the cluster to the next - * SessionFrameHandler. + * As FrameHandler, handles frames by sending them to the + * cluster. Frames received from the cluster are sent to the next + * FrameHandler in the chain. * * */ -class Cluster : public SessionFrameHandler, +class Cluster : public framing::FrameHandler, private sys::Runnable, private Cpg::Handler { public: @@ -66,7 +65,7 @@ class Cluster : public SessionFrameHandler, * @param handler for frames received from the cluster. */ Cluster(const std::string& name, const std::string& url, - const SessionFrameHandler::Chain& next); + const framing::FrameHandler::Chain& next); virtual ~Cluster(); @@ -87,7 +86,7 @@ class Cluster : public SessionFrameHandler, sys::Duration timeout=sys::TIME_INFINITE) const; /** Send frame to the cluster */ - void handle(SessionFrame&); + void handle(framing::AMQFrame&); private: typedef Cpg::Id Id; diff --git a/cpp/src/qpid/cluster/SessionFrame.cpp b/cpp/src/qpid/cluster/SessionFrame.cpp deleted file mode 100644 index 1a20a5eddc..0000000000 --- a/cpp/src/qpid/cluster/SessionFrame.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "SessionFrame.h" - -#include "qpid/QpidError.h" - -namespace qpid { -namespace cluster { - -void SessionFrame::encode(framing::Buffer& buf) { - uuid.encode(buf); - frame.encode(buf); - buf.putOctet(isIncoming); -} - -void SessionFrame::decode(framing::Buffer& buf) { - uuid.decode(buf); - if (!frame.decode(buf)) - THROW_QPID_ERROR(FRAMING_ERROR, "Incomplete frame"); - isIncoming = buf.getOctet(); -} - -size_t SessionFrame::size() const { - return uuid.size() + frame.size() + 1 /*isIncoming*/; -} - -std::ostream& operator<<(std::ostream& out, const SessionFrame& frame) { - return out << "[session=" << frame.uuid - << (frame.isIncoming ? ",in: ":",out: ") - << frame.frame << "]"; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionFrame.h b/cpp/src/qpid/cluster/SessionFrame.h deleted file mode 100644 index 12885da7e1..0000000000 --- a/cpp/src/qpid/cluster/SessionFrame.h +++ /dev/null @@ -1,71 +0,0 @@ -#ifndef QPID_CLUSTER_SESSIONFRAME_H -#define QPID_CLUSTER_SESSIONFRAME_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/Handler.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/Uuid.h" - -#include <ostream> - -namespace qpid { - -namespace framing { -class AMQFrame; -class Buffer; -} - -namespace cluster { - -/** - * An AMQFrame with a UUID and direction. - */ -struct SessionFrame -{ - SessionFrame() : isIncoming(false) {} - - SessionFrame(const framing::Uuid& id, const framing::AMQFrame& f, bool incoming) - : uuid(id), frame(f), isIncoming(incoming) {} - - void encode(framing::Buffer&); - - void decode(framing::Buffer&); - - size_t size() const; - - static const bool IN = true; - static const bool OUT = false; - - framing::Uuid uuid; - framing::AMQFrame frame; - bool isIncoming; -}; - -typedef framing::Handler<SessionFrame&> SessionFrameHandler; - -std::ostream& operator<<(std::ostream&, const SessionFrame&); - -}} // namespace qpid::cluster - - - -#endif /*!QPID_CLUSTER_SESSIONFRAME_H*/ diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp index 88ddfe843f..c9e79b4bbc 100644 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -74,61 +74,38 @@ using namespace broker; virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {} }; -/** Wrap plain AMQFrames in SessionFrames */ -struct FrameWrapperHandler : public FrameHandler { - - FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_) - : uuid(id), direction(dir), next(next_) { - assert(!uuid.isNull()); - } - - void handle(AMQFrame& frame) { - SessionFrame sf(uuid, frame, direction); - assert(next); - next->handle(sf); - } - - Uuid uuid; - bool direction; - SessionFrameHandler::Chain next; -}; - SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {} -void SessionManager::update(FrameHandler::Chains& chains) { +void SessionManager::update(ChannelId channel, FrameHandler::Chains& chains) { Mutex::ScopedLock l(lock); // Create a new local session, store local chains. - Uuid uuid(true); - sessions[uuid] = chains; + sessions[channel] = chains; - // Replace local in chain. Build from the back. - // TODO aconway 2007-07-05: Currently mcast wiring, bypass - // everythign else. + // Replace local "in" chain to mcast wiring and process other frames + // as normal. assert(clusterSend); - FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend)); - FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); - chains.in = classify; - - // Leave out chain unmodified. - // TODO aconway 2007-07-05: Failover will require replication of - // outgoing frames to session replicas. + chains.in = make_shared_ptr( + new ClassifierHandler(clusterSend, chains.in)); } -void SessionManager::handle(SessionFrame& frame) { +void SessionManager::handle(AMQFrame& frame) { // Incoming from cluster. { Mutex::ScopedLock l(lock); - assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming? - SessionMap::iterator i = sessions.find(frame.uuid); + SessionMap::iterator i = sessions.find(frame.getChannel()); if (i == sessions.end()) { - // Non local method frame, invoke. - localBroker->handle(frame.frame); + // Non-local wiring method frame, invoke locally. + localBroker->handle(frame); } else { - // Local frame, continue on local chain - i->second.in->handle(frame.frame); + // Local frame continuing on local chain + i->second.in->handle(frame); } } } +void SessionManager::setClusterSend(const FrameHandler::Chain& send) { + clusterSend=send; +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h index 77fc71733b..fc43d6e653 100644 --- a/cpp/src/qpid/cluster/SessionManager.h +++ b/cpp/src/qpid/cluster/SessionManager.h @@ -19,7 +19,6 @@ * */ -#include "qpid/cluster/SessionFrame.h" #include "qpid/framing/HandlerUpdater.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/Uuid.h" @@ -41,30 +40,30 @@ namespace cluster { * Manage sessions and handler chains for the cluster. * */ -class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler, +class SessionManager : public framing::HandlerUpdater, public framing::FrameHandler, private boost::noncopyable { public: SessionManager(broker::Broker& broker); /** Set the handler to send to the cluster */ - void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; } + void setClusterSend(const framing::FrameHandler::Chain& send); /** As ChannelUpdater update the handler chains. */ - void update(framing::FrameHandler::Chains& chains); + void update(framing::ChannelId, framing::FrameHandler::Chains&); - /** As SessionFrameHandler handle frames received from the cluster */ - void handle(SessionFrame&); + /** As FrameHandler frames received from the cluster */ + void handle(framing::AMQFrame&); /** Get ChannelID for UUID. Return 0 if no mapping */ framing::ChannelId getChannelId(const framing::Uuid&) const; private: class SessionOperations; - typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap; + typedef std::map<framing::ChannelId,framing::FrameHandler::Chains> SessionMap; sys::Mutex lock; - SessionFrameHandler::Chain clusterSend; + framing::FrameHandler::Chain clusterSend; framing::FrameHandler::Chain localBroker; SessionMap sessions; }; diff --git a/cpp/src/qpid/framing/HandlerUpdater.h b/cpp/src/qpid/framing/HandlerUpdater.h index b9497e4f12..fb71c04fd6 100644 --- a/cpp/src/qpid/framing/HandlerUpdater.h +++ b/cpp/src/qpid/framing/HandlerUpdater.h @@ -31,10 +31,10 @@ struct HandlerUpdater { virtual ~HandlerUpdater() {} /** Update the handler chains. - *@param id Unique identifier for channel or session. + *@param channel Id of associated channel. *@param chains Handler chains to be updated. */ - virtual void update(FrameHandler::Chains& chains) = 0; + virtual void update(ChannelId channel, FrameHandler::Chains& chains) = 0; }; }} // namespace qpid::framing |