summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-27 22:08:51 +0000
committerAlan Conway <aconway@apache.org>2007-07-27 22:08:51 +0000
commit956a72f00b64928a601ea2891789a53271fc7571 (patch)
treec4469485822787d4742b06d35e23df007f399ed9 /cpp/src
parentac669123004b6e78468cc4fcea3ffb4b9d7b7bd3 (diff)
downloadqpid-python-956a72f00b64928a601ea2891789a53271fc7571.tar.gz
* src/tests/ais_check, cluster.mk: Run AIS tests only if:
- CLUSTER makefile conditional set by configure. - Effective gid == ais - aisexec is running Otherwise print a warning. * src/tests/EventChannelConnectionTest.cpp * src/qpid/cluster/doxygen_overview.h Removed unused files. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560404 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/Connection.cpp2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp19
-rw-r--r--cpp/src/qpid/cluster/Cluster.h13
-rw-r--r--cpp/src/qpid/cluster/SessionFrame.cpp51
-rw-r--r--cpp/src/qpid/cluster/SessionFrame.h71
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp55
-rw-r--r--cpp/src/qpid/cluster/SessionManager.h15
-rw-r--r--cpp/src/qpid/framing/HandlerUpdater.h4
11 files changed, 46 insertions, 195 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index dcc70ca897..39df373d2e 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -14,8 +14,6 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/ClusterPlugin.cpp \
qpid/cluster/ClassifierHandler.h \
qpid/cluster/ClassifierHandler.cpp \
- qpid/cluster/SessionFrame.h \
- qpid/cluster/SessionFrame.cpp \
qpid/cluster/SessionManager.h \
qpid/cluster/SessionManager.cpp
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 26ec55ac44..fa183979e1 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -47,6 +47,7 @@
using qpid::sys::Acceptor;
using qpid::framing::HandlerUpdater;
using qpid::framing::FrameHandler;
+using qpid::framing::ChannelId;
namespace qpid {
namespace broker {
@@ -162,9 +163,10 @@ void Broker::add(const shared_ptr<HandlerUpdater>& updater) {
handlerUpdaters.push_back(updater);
}
-void Broker::update(FrameHandler::Chains& chains) {
+void Broker::update(ChannelId channel, FrameHandler::Chains& chains) {
for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
- boost::bind(&HandlerUpdater::update, _1, boost::ref(chains)));
+ boost::bind(&HandlerUpdater::update, _1,
+ channel, boost::ref(chains)));
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 9f57a45e0c..1ccc3564f5 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -33,7 +33,6 @@
#include "qpid/Plugin.h"
#include "qpid/Url.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/HandlerUpdater.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Acceptor.h"
@@ -96,7 +95,7 @@ class Broker : public sys::Runnable, public Plugin::Target
void add(const shared_ptr<framing::HandlerUpdater>&);
/** Apply all handler updaters to a handler chain pair. */
- void update(framing::FrameHandler::Chains&);
+ void update(framing::ChannelId, framing::FrameHandler::Chains&);
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 5b22167323..6bd846e2a3 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -108,7 +108,7 @@ FrameHandler::Chains& Connection::getChannel(ChannelId id) {
FrameHandler::Chains chains(
new SemanticHandler(id, *this),
new OutputHandlerFrameHandler(*out));
- broker.update(chains);
+ broker.update(id, chains);
i = channels.insert(ChannelMap::value_type(id, chains)).first;
}
return i->second;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index b59bfe878d..52d8691f33 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -19,7 +19,6 @@
#include "Cluster.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
@@ -52,9 +51,9 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
Cluster::Cluster(
const std::string& name_, const std::string& url_,
- const SessionFrameHandler::Chain& next
+ const FrameHandler::Chain& next
) :
- SessionFrameHandler(next),
+ FrameHandler(next),
cpg(new Cpg(*this)),
name(name_),
url(url_),
@@ -85,7 +84,7 @@ Cluster::~Cluster() {
}
}
-void Cluster::handle(SessionFrame& frame) {
+void Cluster::handle(AMQFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -95,9 +94,9 @@ void Cluster::handle(SessionFrame& frame) {
}
void Cluster::notify() {
- SessionFrame sf;
- sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
- handle(sf);
+ AMQFrame frame(ProtocolVersion(), 0,
+ new ClusterNotifyBody(ProtocolVersion(), url));
+ handle(frame);
}
size_t Cluster::size() const {
@@ -125,11 +124,11 @@ void Cluster::deliver(
assert(name == *group);
Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
- SessionFrame frame;
+ AMQFrame frame;
frame.decode(buf);
QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.uuid.isNull())
- handleClusterFrame(from, frame.frame);
+ if (frame.getChannel() == 0)
+ handleClusterFrame(from, frame);
else
next->handle(frame);
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index f6afe14c62..b9cd3b73d1 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -20,7 +20,6 @@
*/
#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/SessionFrame.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/shared_ptr.h"
#include "qpid/sys/Monitor.h"
@@ -40,13 +39,13 @@ namespace qpid { namespace cluster {
* Connection to the cluster. Maintains cluster membership
* data.
*
- * As SessionFrameHandler, handles frames by sending them to the
- * cluster, sends frames received from the cluster to the next
- * SessionFrameHandler.
+ * As FrameHandler, handles frames by sending them to the
+ * cluster. Frames received from the cluster are sent to the next
+ * FrameHandler in the chain.
*
*
*/
-class Cluster : public SessionFrameHandler,
+class Cluster : public framing::FrameHandler,
private sys::Runnable, private Cpg::Handler
{
public:
@@ -66,7 +65,7 @@ class Cluster : public SessionFrameHandler,
* @param handler for frames received from the cluster.
*/
Cluster(const std::string& name, const std::string& url,
- const SessionFrameHandler::Chain& next);
+ const framing::FrameHandler::Chain& next);
virtual ~Cluster();
@@ -87,7 +86,7 @@ class Cluster : public SessionFrameHandler,
sys::Duration timeout=sys::TIME_INFINITE) const;
/** Send frame to the cluster */
- void handle(SessionFrame&);
+ void handle(framing::AMQFrame&);
private:
typedef Cpg::Id Id;
diff --git a/cpp/src/qpid/cluster/SessionFrame.cpp b/cpp/src/qpid/cluster/SessionFrame.cpp
deleted file mode 100644
index 1a20a5eddc..0000000000
--- a/cpp/src/qpid/cluster/SessionFrame.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SessionFrame.h"
-
-#include "qpid/QpidError.h"
-
-namespace qpid {
-namespace cluster {
-
-void SessionFrame::encode(framing::Buffer& buf) {
- uuid.encode(buf);
- frame.encode(buf);
- buf.putOctet(isIncoming);
-}
-
-void SessionFrame::decode(framing::Buffer& buf) {
- uuid.decode(buf);
- if (!frame.decode(buf))
- THROW_QPID_ERROR(FRAMING_ERROR, "Incomplete frame");
- isIncoming = buf.getOctet();
-}
-
-size_t SessionFrame::size() const {
- return uuid.size() + frame.size() + 1 /*isIncoming*/;
-}
-
-std::ostream& operator<<(std::ostream& out, const SessionFrame& frame) {
- return out << "[session=" << frame.uuid
- << (frame.isIncoming ? ",in: ":",out: ")
- << frame.frame << "]";
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionFrame.h b/cpp/src/qpid/cluster/SessionFrame.h
deleted file mode 100644
index 12885da7e1..0000000000
--- a/cpp/src/qpid/cluster/SessionFrame.h
+++ /dev/null
@@ -1,71 +0,0 @@
-#ifndef QPID_CLUSTER_SESSIONFRAME_H
-#define QPID_CLUSTER_SESSIONFRAME_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/Handler.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/Uuid.h"
-
-#include <ostream>
-
-namespace qpid {
-
-namespace framing {
-class AMQFrame;
-class Buffer;
-}
-
-namespace cluster {
-
-/**
- * An AMQFrame with a UUID and direction.
- */
-struct SessionFrame
-{
- SessionFrame() : isIncoming(false) {}
-
- SessionFrame(const framing::Uuid& id, const framing::AMQFrame& f, bool incoming)
- : uuid(id), frame(f), isIncoming(incoming) {}
-
- void encode(framing::Buffer&);
-
- void decode(framing::Buffer&);
-
- size_t size() const;
-
- static const bool IN = true;
- static const bool OUT = false;
-
- framing::Uuid uuid;
- framing::AMQFrame frame;
- bool isIncoming;
-};
-
-typedef framing::Handler<SessionFrame&> SessionFrameHandler;
-
-std::ostream& operator<<(std::ostream&, const SessionFrame&);
-
-}} // namespace qpid::cluster
-
-
-
-#endif /*!QPID_CLUSTER_SESSIONFRAME_H*/
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index 88ddfe843f..c9e79b4bbc 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -74,61 +74,38 @@ using namespace broker;
virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {}
};
-/** Wrap plain AMQFrames in SessionFrames */
-struct FrameWrapperHandler : public FrameHandler {
-
- FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_)
- : uuid(id), direction(dir), next(next_) {
- assert(!uuid.isNull());
- }
-
- void handle(AMQFrame& frame) {
- SessionFrame sf(uuid, frame, direction);
- assert(next);
- next->handle(sf);
- }
-
- Uuid uuid;
- bool direction;
- SessionFrameHandler::Chain next;
-};
-
SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {}
-void SessionManager::update(FrameHandler::Chains& chains) {
+void SessionManager::update(ChannelId channel, FrameHandler::Chains& chains) {
Mutex::ScopedLock l(lock);
// Create a new local session, store local chains.
- Uuid uuid(true);
- sessions[uuid] = chains;
+ sessions[channel] = chains;
- // Replace local in chain. Build from the back.
- // TODO aconway 2007-07-05: Currently mcast wiring, bypass
- // everythign else.
+ // Replace local "in" chain to mcast wiring and process other frames
+ // as normal.
assert(clusterSend);
- FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend));
- FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
- chains.in = classify;
-
- // Leave out chain unmodified.
- // TODO aconway 2007-07-05: Failover will require replication of
- // outgoing frames to session replicas.
+ chains.in = make_shared_ptr(
+ new ClassifierHandler(clusterSend, chains.in));
}
-void SessionManager::handle(SessionFrame& frame) {
+void SessionManager::handle(AMQFrame& frame) {
// Incoming from cluster.
{
Mutex::ScopedLock l(lock);
- assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming?
- SessionMap::iterator i = sessions.find(frame.uuid);
+ SessionMap::iterator i = sessions.find(frame.getChannel());
if (i == sessions.end()) {
- // Non local method frame, invoke.
- localBroker->handle(frame.frame);
+ // Non-local wiring method frame, invoke locally.
+ localBroker->handle(frame);
}
else {
- // Local frame, continue on local chain
- i->second.in->handle(frame.frame);
+ // Local frame continuing on local chain
+ i->second.in->handle(frame);
}
}
}
+void SessionManager::setClusterSend(const FrameHandler::Chain& send) {
+ clusterSend=send;
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h
index 77fc71733b..fc43d6e653 100644
--- a/cpp/src/qpid/cluster/SessionManager.h
+++ b/cpp/src/qpid/cluster/SessionManager.h
@@ -19,7 +19,6 @@
*
*/
-#include "qpid/cluster/SessionFrame.h"
#include "qpid/framing/HandlerUpdater.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/Uuid.h"
@@ -41,30 +40,30 @@ namespace cluster {
* Manage sessions and handler chains for the cluster.
*
*/
-class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler,
+class SessionManager : public framing::HandlerUpdater, public framing::FrameHandler,
private boost::noncopyable
{
public:
SessionManager(broker::Broker& broker);
/** Set the handler to send to the cluster */
- void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
+ void setClusterSend(const framing::FrameHandler::Chain& send);
/** As ChannelUpdater update the handler chains. */
- void update(framing::FrameHandler::Chains& chains);
+ void update(framing::ChannelId, framing::FrameHandler::Chains&);
- /** As SessionFrameHandler handle frames received from the cluster */
- void handle(SessionFrame&);
+ /** As FrameHandler frames received from the cluster */
+ void handle(framing::AMQFrame&);
/** Get ChannelID for UUID. Return 0 if no mapping */
framing::ChannelId getChannelId(const framing::Uuid&) const;
private:
class SessionOperations;
- typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
+ typedef std::map<framing::ChannelId,framing::FrameHandler::Chains> SessionMap;
sys::Mutex lock;
- SessionFrameHandler::Chain clusterSend;
+ framing::FrameHandler::Chain clusterSend;
framing::FrameHandler::Chain localBroker;
SessionMap sessions;
};
diff --git a/cpp/src/qpid/framing/HandlerUpdater.h b/cpp/src/qpid/framing/HandlerUpdater.h
index b9497e4f12..fb71c04fd6 100644
--- a/cpp/src/qpid/framing/HandlerUpdater.h
+++ b/cpp/src/qpid/framing/HandlerUpdater.h
@@ -31,10 +31,10 @@ struct HandlerUpdater {
virtual ~HandlerUpdater() {}
/** Update the handler chains.
- *@param id Unique identifier for channel or session.
+ *@param channel Id of associated channel.
*@param chains Handler chains to be updated.
*/
- virtual void update(FrameHandler::Chains& chains) = 0;
+ virtual void update(ChannelId channel, FrameHandler::Chains& chains) = 0;
};
}} // namespace qpid::framing