diff options
author | Alan Conway <aconway@apache.org> | 2007-06-26 02:11:55 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-06-26 02:11:55 +0000 |
commit | e6566439f627e375f12f77044819bbb37b585348 (patch) | |
tree | 18c52172d536b53df57e82a274a31bcfabc35f7b /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 87c376ebc8fe6af86dc8aef8dcec03510ff5dcc0 (diff) | |
download | qpid-python-e6566439f627e375f12f77044819bbb37b585348.tar.gz |
2007-06-25 <aconway@redhat.com>
Cluster class implementing cluster membership map.
* src/qpid/cluster/Cluster.cpp: Cluster membership implementation.
* src/qpid/cluster/Cpg.cpp: Support for boost::function callbacks.
* src/tests/Url.cpp: Implements AMQP-95 URL format.
* xml/cluster.xml: Cluster join method.
Build/packaging
* README: Remove mention of openais till clustering is functional.
For now it is optional and we depend on an unpackaged version.
* configure.ac: Check openais has cpg_local_get().
* Makefile.am: Added cluster.xml to EXTRA_DIST.
* src/generate.sh: add cluster.xml to codegen.
* src/tests/Makefile.am:
- Generate individual "sudo -u ais" wrappers for openais tests.
- Drop "unit" directory, all unit tests in "tests" directory
Minor changes:
* src/qpid/sys/posix/Socket.cpp:
* src/qpid/sys/posix/PosixAcceptor.cpp:
* src/qpid/sys/posix/EventChannelAcceptor.cpp:
* src/qpid/sys/apr/APRAcceptor.cpp:
* src/qpid/sys/Acceptor.h (getHost): Added getHost()
* src/tests/.valgrind.supp-default: Suppress benign valgrind
warning in libcpg.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@550658 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp new file mode 100644 index 0000000000..30073c4551 --- /dev/null +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -0,0 +1,159 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Cluster.h" +#include "Cpg.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <algorithm> +#include <iterator> + +namespace qpid { +namespace cluster { +using namespace qpid::framing; +using namespace qpid::sys; +using namespace std; + +ostream& operator <<(ostream& out, const Cluster& cluster) { + return out << cluster.name.str() << "(" << cluster.self << ")"; +} + +void Cluster::notify() { + // TODO aconway 2007-06-25: Use proxy here. + AMQFrame frame(version, 0, + make_shared_ptr(new ClusterNotifyBody(version, url))); + handle(frame); +} + +Cluster::Cluster( + const std::string& name_, const std::string& url_, FrameHandler& next_, + ProtocolVersion ver) + : name(name_), url(url_), version(ver), + cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), + boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), + next(next_) +{ + self=Id(cpg->getLocalNoideId(), getpid()); + QPID_LOG(trace, *this << " Joining cluster."); + cpg->join(name); + notify(); + dispatcher=Thread(*this); +} + +Cluster::~Cluster() { + try { + QPID_LOG(trace, *this << " Leaving cluster."); + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << e.what()); + } +} + +void Cluster::handle(AMQFrame& frame) { + QPID_LOG(trace, *this << " SEND: " << frame); + Buffer buf(frame.size()); + frame.encode(buf); + buf.flip(); + iovec iov = { buf.start(), frame.size() }; + cpg->mcast(name, &iov, 1); +} + +size_t Cluster::size() const { + Mutex::ScopedLock l(lock); + return members.size(); +} + +Cluster::MemberList Cluster::getMembers() const { + Mutex::ScopedLock l(lock); + MemberList result(members.size()); + std::transform(members.begin(), members.end(), result.begin(), + boost::bind(&MemberMap::value_type::second, _1)); + return result; +} + +void Cluster::cpgDeliver( + cpg_handle_t /*handle*/, + struct cpg_name* /* group */, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) +{ + Id from(nodeid, pid); + Buffer buf(static_cast<char*>(msg), msg_len); + AMQFrame frame; + frame.decode(buf); + QPID_LOG(trace, *this << " RECV: " << frame); + // TODO aconway 2007-06-20: use visitor pattern. + ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); + if (notifyIn) { + Mutex::ScopedLock l(lock); + members[from].reset(new Member(notifyIn->getUrl())); + lock.notifyAll(); + } + else + next.handle(frame); +} + +void Cluster::cpgConfigChange( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address *ccMembers, int nMembers, + struct cpg_address *left, int nLeft, + struct cpg_address *joined, int nJoined +) +{ + QPID_LOG( + trace, + *this << " Configuration change. " << endl + << " Joined: " << make_pair(joined, nJoined) << endl + << " Left: " << make_pair(left, nLeft) << endl + << " Current: " << make_pair(ccMembers, nMembers)); + + { + Mutex::ScopedLock l(lock); + // Erase members that left. + for (int i = 0; i < nLeft; ++i) + members.erase(Id(left[i])); + lock.notifyAll(); + } + + // If there are new members (other than myself) then notify. + for (int i=0; i< nJoined; ++i) { + if (Id(joined[i]) != self) { + notify(); + break; + } + } + + // Note: New members are be added to my map when cpgDeliver + // gets a cluster.notify frame. +} + +void Cluster::run() { + cpg->dispatchBlocking(); +} + +}} // namespace qpid::cluster + + + |