diff options
author | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
commit | cb566519d58ded6704507fa5530bf901e620edf6 (patch) | |
tree | ab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src/qpid/cluster/SessionManager.cpp | |
parent | 3f900af77d5f781431dc25e307974e0fc27aa561 (diff) | |
download | qpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz |
* Summary:
- Connect cluster handlers into broker handler chains.
- Progress on wiring replication.
* src/tests/cluster.mk: Temporarily disabled Cluster test.
* src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs.
* src/qpidd.cpp:
- Load optional libs (cluster)
- Include plugin config in options.parse.
* src/qpid/cluster/SessionManager.h:
- Create sessions, update handler chains (as HandlerUpdater)
- Handle frames from cluster.
* src/qpid/cluster/ClusterPlugin.h, .cpp:
- renamed from ClusterPluginProvider
- Create and connect Cluster and SessionManager.
- Register SessionManager as HandlerUpdater.
* src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler.
* src/qpid/broker/Connection.cpp: Apply HandlerUpdaters.
* src/qpid/broker/Broker.h, .cpp:
- Initialize plugins
- Apply HandlerUpdaters
* src/qpid/Plugin.h, .cpp: Simplified plugin framework.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/SessionManager.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp new file mode 100644 index 0000000000..24f201535d --- /dev/null +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "SessionManager.h" +#include "ClassifierHandler.h" + +namespace qpid { +namespace cluster { + +using namespace framing; +using namespace sys; + +/** Wrap plain AMQFrames in SessionFrames */ +struct FrameWrapperHandler : public FrameHandler { + + FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_) + : uuid(id), direction(dir), next(next_) { + assert(!uuid.isNull()); + } + + void handle(AMQFrame& frame) { + SessionFrame sf(uuid, frame, direction); + assert(next); + next->handle(sf); + } + + Uuid uuid; + bool direction; + SessionFrameHandler::Chain next; +}; + +SessionManager::SessionManager() {} + +void SessionManager::update(FrameHandler::Chains& chains) +{ + Mutex::ScopedLock l(lock); + // Create a new local session, store local chains. + Uuid uuid(true); + sessions[uuid] = chains; + + // Replace local incoming chain. Build from the back. + // + // TODO aconway 2007-07-05: Currently mcast wiring, bypass + // everythign else. + assert(clusterSend); + FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend)); + FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); + chains.in = classify; + + // FIXME aconway 2007-07-05: Need to stop bypassed frames + // from overtaking mcast frames. + // + + // Leave outgoing chain unmodified. + // TODO aconway 2007-07-05: Failover will require replication of + // outgoing frames to session replicas. + +} + +void SessionManager::handle(SessionFrame& frame) { + // Incoming from frame. + FrameHandler::Chains chains; + { + Mutex::ScopedLock l(lock); + SessionMap::iterator i = sessions.find(frame.uuid); + if (i == sessions.end()) { + QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); + chains = nonLocal; + } + else { + QPID_LOG(trace, "Local frame from cluster: " << frame.frame); + chains = i->second; + } + } + FrameHandler::Chain chain = + chain = frame.isIncoming ? chains.in : chains.out; + // TODO aconway 2007-07-11: Should this be assert(chain) + if (chain) + chain->handle(frame.frame); + + // TODO aconway 2007-07-05: Here's where we should unblock frame + // dispatch for the channel. +} + +}} // namespace qpid::cluster |