diff options
Diffstat (limited to 'cpp/src/qpid/cluster/SessionManager.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp new file mode 100644 index 0000000000..24f201535d --- /dev/null +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "SessionManager.h" +#include "ClassifierHandler.h" + +namespace qpid { +namespace cluster { + +using namespace framing; +using namespace sys; + +/** Wrap plain AMQFrames in SessionFrames */ +struct FrameWrapperHandler : public FrameHandler { + + FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_) + : uuid(id), direction(dir), next(next_) { + assert(!uuid.isNull()); + } + + void handle(AMQFrame& frame) { + SessionFrame sf(uuid, frame, direction); + assert(next); + next->handle(sf); + } + + Uuid uuid; + bool direction; + SessionFrameHandler::Chain next; +}; + +SessionManager::SessionManager() {} + +void SessionManager::update(FrameHandler::Chains& chains) +{ + Mutex::ScopedLock l(lock); + // Create a new local session, store local chains. + Uuid uuid(true); + sessions[uuid] = chains; + + // Replace local incoming chain. Build from the back. + // + // TODO aconway 2007-07-05: Currently mcast wiring, bypass + // everythign else. + assert(clusterSend); + FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend)); + FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); + chains.in = classify; + + // FIXME aconway 2007-07-05: Need to stop bypassed frames + // from overtaking mcast frames. + // + + // Leave outgoing chain unmodified. + // TODO aconway 2007-07-05: Failover will require replication of + // outgoing frames to session replicas. + +} + +void SessionManager::handle(SessionFrame& frame) { + // Incoming from frame. + FrameHandler::Chains chains; + { + Mutex::ScopedLock l(lock); + SessionMap::iterator i = sessions.find(frame.uuid); + if (i == sessions.end()) { + QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); + chains = nonLocal; + } + else { + QPID_LOG(trace, "Local frame from cluster: " << frame.frame); + chains = i->second; + } + } + FrameHandler::Chain chain = + chain = frame.isIncoming ? chains.in : chains.out; + // TODO aconway 2007-07-11: Should this be assert(chain) + if (chain) + chain->handle(frame.frame); + + // TODO aconway 2007-07-05: Here's where we should unblock frame + // dispatch for the channel. +} + +}} // namespace qpid::cluster |