summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk28
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp17
-rw-r--r--cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp11
-rw-r--r--cpp/src/qpid/cluster/Cpg.h6
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp111
-rw-r--r--cpp/src/qpid/cluster/SessionManager.h89
-rw-r--r--cpp/src/qpid/framing/FrameDefaultVisitor.h10
9 files changed, 29 insertions, 249 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 49fe5c2a81..0c3e4af20f 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -3,24 +3,22 @@
#
lib_LTLIBRARIES += libqpidcluster.la
-# if CLUSTER
+if CLUSTER
-# libqpidcluster_la_SOURCES = \
-# qpid/cluster/Cluster.cpp \
-# qpid/cluster/Cluster.h \
-# qpid/cluster/Cpg.cpp \
-# qpid/cluster/Cpg.h \
-# qpid/cluster/Dispatchable.h \
-# qpid/cluster/ClusterPlugin.cpp \
-# qpid/cluster/ClassifierHandler.h \
-# qpid/cluster/ClassifierHandler.cpp \
-# qpid/cluster/SessionManager.h \
-# qpid/cluster/SessionManager.cpp
+libqpidcluster_la_SOURCES = \
+ qpid/cluster/Cluster.cpp \
+ qpid/cluster/Cluster.h \
+ qpid/cluster/Cpg.cpp \
+ qpid/cluster/Cpg.h \
+ qpid/cluster/Dispatchable.h \
+ qpid/cluster/ClusterPlugin.cpp \
+ qpid/cluster/ClassifierHandler.h \
+ qpid/cluster/ClassifierHandler.cpp
-# libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
+libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
-# else
+else
# Empty stub library to satisfy rpm spec file.
libqpidcluster_la_SOURCES =
-#endif
+endif
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3c73719ef9..ce87d23c0d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -21,6 +21,7 @@
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/scoped_array.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -46,13 +47,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker& broker) :
- FrameHandler(&sessions),
+Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker&) :
+ FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer
cpg(*this),
name(name_),
- url(url_),
- self(Id::self(cpg)),
- sessions(broker, *this)
+ url(url_)
{
QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg.join(name);
@@ -80,10 +79,10 @@ Cluster::~Cluster() {
void Cluster::handle(AMQFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
- Buffer buf(frame.size());
+ boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling.
+ Buffer buf(store.get());
frame.encode(buf);
- buf.flip();
- iovec iov = { buf.start(), frame.size() };
+ iovec iov = { store.get(), frame.size() };
cpg.mcast(name, &iov, 1);
}
@@ -144,6 +143,8 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
{
Mutex::ScopedLock l(lock);
members[from].url=notifyIn->getUrl();
+ if (!self.id && notifyIn->getUrl() == url)
+ self=from;
lock.notifyAll();
QPID_LOG(trace, *this << ": members joined: " << members);
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 1c53bcb9a7..5aca3faf44 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -19,9 +19,10 @@
*
*/
-#include "SessionManager.h"
#include "Cpg.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -119,7 +120,6 @@ class Cluster : public framing::FrameHandler,
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- SessionManager sessions;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 35a07fbc2d..e24c60dc2f 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -17,7 +17,6 @@
*/
#include "qpid/broker/Broker.h"
#include "qpid/cluster/Cluster.h"
-#include "qpid/cluster/SessionManager.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
@@ -43,7 +42,6 @@ struct ClusterPlugin : public Plugin {
ClusterOptions options;
boost::optional<Cluster> cluster;
- boost::optional<SessionManager> sessions;
Options* getOptions() { return &options; }
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 506703d105..01d97d2a17 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -145,13 +145,6 @@ std::string Cpg::cantMcastMsg(const Name& group) {
return "Cannot mcast to CPG group "+group.str();
}
-uint32_t Cpg::getLocalNoideId() const {
- unsigned int nodeid;
- check(cpg_local_get(handle, &nodeid), "Cannot get local node ID");
- assert(nodeid <= std::numeric_limits<uint32_t>::max());
- return nodeid;
-}
-
ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
ostream_iterator<Cpg::Id> i(o, " ");
std::copy(a.first, a.first+a.second, i);
@@ -177,10 +170,6 @@ ostream& operator <<(ostream& out, const cpg_name& name) {
}
-Cpg::Id Cpg::Id::self(Cpg& cpg) {
- return Id(cpg.getLocalNoideId(), getpid());
-}
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index 92e2e591b8..09b6996cc0 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -57,12 +57,10 @@ class Cpg : public Dispatchable {
struct Id {
uint64_t id;
- Id() : id(0) {}
+ Id(uint64_t n=0) : id(n) {}
Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
- static Id self(Cpg& cpg);
-
operator uint64_t() const { return id; }
uint32_t nodeId() const { return id >> 32; }
pid_t pid() const { return id & 0xFFFF; }
@@ -132,8 +130,6 @@ class Cpg : public Dispatchable {
cpg_handle_t getHandle() const { return handle; }
- uint32_t getLocalNoideId() const;
-
private:
class Handles;
struct ClearHandleOnExit;
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
deleted file mode 100644
index 68e0223a40..0000000000
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "SessionManager.h"
-#include "ClassifierHandler.h"
-
-#include "qpid/log/Statement.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/broker/BrokerAdapter.h"
-#include "qpid/broker/Connection.h"
-
-#include <boost/utility/in_place_factory.hpp>
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-using namespace sys;
-using namespace broker;
-
-/** Handler to send frames direct to local broker (bypass correlation etc.) */
-struct SessionManager::BrokerHandler : public FrameHandler
-{
- Connection connection;
- SessionHandler sessionAdapter;
- broker::Session session;
- BrokerAdapter adapter;
-
- // TODO aconway 2007-07-23: Lots of needless flab here (Channel,
- // Connection, ChannelAdapter) As these classes are untangled the
- // flab can be reduced. The real requirements are:
- // - Dispatch methods direct to broker bypassing all the correlation muck
- // - Efficiently suppress responses
- // For the latter we are now using a ChannelAdapter with noop send()
- // A more efficient solution would be a no-op proxy.
- //
- BrokerHandler(Broker& broker) :
- connection(0, broker),
- sessionAdapter(connection, 0),
- session(sessionAdapter, 1),
- adapter(session, 0) {} // FIXME aconway 2008-01-29:
-
- void handle(AMQFrame& frame) {
- AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.getBody());
- assert(body);
- body->invoke(adapter);
- }
-
- // Dummy methods.
- virtual void handleHeader(AMQHeaderBody*){}
- virtual void handleContent(AMQContentBody*){}
- virtual void handleHeartbeat(AMQHeartbeatBody*){}
- virtual bool isOpen() const{ return true; }
- virtual void handleMethod(AMQMethodBody*){}
- // No-op send.
- virtual void send(const AMQBody&) {}
-
- //delivery adapter methods, also no-ops:
- virtual DeliveryId deliver(intrusive_ptr<Message>&, DeliveryToken::shared_ptr) { return 0; }
- virtual void redeliver(intrusive_ptr<Message>&, DeliveryToken::shared_ptr, DeliveryId) {}
-};
-
-SessionManager::~SessionManager(){}
-
-SessionManager::SessionManager(Broker& b, FrameHandler& c)
- : cluster(c), localBroker(new BrokerHandler(b)) {}
-
-void SessionManager::update(ChannelId channel, FrameHandler::Chains& chains) {
- Mutex::ScopedLock l(lock);
- // Create a new local session, store local chains.
- assert(!sessions[channel]);
- boost::optional<Session>& session=sessions[channel];
- session = boost::in_place(boost::ref(cluster), boost::ref(chains.in));
- chains.in = &session->classifier;
-}
-
-void SessionManager::handle(AMQFrame& frame) {
- // Incoming from cluster.
- {
- Mutex::ScopedLock l(lock);
- SessionMap::iterator i=sessions.find(frame.getChannel());
- if (i == sessions.end()) {
- // Non-local wiring method frame, invoke locally.
- (*localBroker)(frame);
- }
- else {
- // Local frame continuing on local chain
- assert(i->second);
- i->second->cont(frame);
- }
- }
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h
deleted file mode 100644
index c0e0cb5736..0000000000
--- a/cpp/src/qpid/cluster/SessionManager.h
+++ /dev/null
@@ -1,89 +0,0 @@
-#ifndef QPID_CLUSTER_SESSIONMANAGER_H
-#define QPID_CLUSTER_SESSIONMANAGER_H
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "ClassifierHandler.h"
-
-//FIXME aconway 2008-01-29: #include "qpid/framing/HandlerUpdater.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/sys/Mutex.h"
-
-#include <boost/noncopyable.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/optional.hpp>
-
-#include <map>
-
-namespace qpid {
-
-namespace broker {
-class Broker;
-}
-
-namespace framing {
-class Uuid;
-}
-
-namespace cluster {
-
-/**
- * Manage the clusters session map.
- * // FIXME aconway 2008-01-29: HandlerUpdater
- */
-class SessionManager : public framing::FrameHandler, private boost::noncopyable
-{
- public:
- SessionManager(broker::Broker& broker, framing::FrameHandler& cluster);
- ~SessionManager();
-
- /** ChannelUpdater: add cluster handlers to session. */
- void update(framing::ChannelId, framing::FrameHandler::Chains&);
-
- /** FrameHandler: map frames from the cluster to sessions. */
- void handle(framing::AMQFrame&);
-
- /** Get ChannelID for UUID. Return 0 if no mapping */
- framing::ChannelId getChannelId(const framing::Uuid&) const;
-
- private:
- class SessionOperations;
- class BrokerHandler;
-
- struct Session {
- Session(framing::FrameHandler& cluster, framing::FrameHandler& cont_)
- : cont(cont_), classifier(cluster,cont_) {}
- framing::FrameHandler& cont; // Continue local dispatch
- ClassifierHandler classifier;
- };
-
- typedef std::map<framing::ChannelId,boost::optional<Session> > SessionMap;
-
- sys::Mutex lock;
- framing::FrameHandler& cluster;
- boost::scoped_ptr<BrokerHandler> localBroker;
- SessionMap sessions;
-};
-
-
-}} // namespace qpid::cluster
-
-
-
-#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/
diff --git a/cpp/src/qpid/framing/FrameDefaultVisitor.h b/cpp/src/qpid/framing/FrameDefaultVisitor.h
index f695414977..07e1d6d997 100644
--- a/cpp/src/qpid/framing/FrameDefaultVisitor.h
+++ b/cpp/src/qpid/framing/FrameDefaultVisitor.h
@@ -24,14 +24,12 @@
#include "qpid/framing/MethodBodyDefaultVisitor.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeartbeatBody.h"
namespace qpid {
namespace framing {
-
-class AMQHeaderBody;
-class AMQContentBody;
-class AMQHeartbeatBody;
-
/**
* Visitor for all concrete frame body types, combines
* AMQBodyConstVisitor and MethodBodyDefaultVisitor.
@@ -45,12 +43,12 @@ struct FrameDefaultVisitor : public AMQBodyConstVisitor,
protected MethodBodyDefaultVisitor
{
virtual void defaultVisit(const AMQBody&) = 0;
+ void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); }
void visit(const AMQHeaderBody& b) { defaultVisit(b); }
void visit(const AMQContentBody& b) { defaultVisit(b); }
void visit(const AMQHeartbeatBody& b) { defaultVisit(b); }
void visit(const AMQMethodBody& b) { b.accept(static_cast<MethodBodyDefaultVisitor&>(*this)); }
- void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); }
using AMQBodyConstVisitor::visit;
using MethodBodyDefaultVisitor::visit;