diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/broker/SessionManager.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 107 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 |
6 files changed, 120 insertions, 59 deletions
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index aadb2b9004..571d3365db 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -43,12 +43,13 @@ SessionManager::SessionManager(uint32_t a) : ack(a) {} SessionManager::~SessionManager() {} +// FIXME aconway 2008-02-01: pass handler*, allow open unattached. std::auto_ptr<SessionState> SessionManager::open( SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); std::auto_ptr<SessionState> session( - new SessionState(*this, h, timeout_, ack)); + new SessionState(this, &h, timeout_, ack)); active.insert(session->getId()); for_each(observers.begin(), observers.end(), boost::bind(&Observer::opened, _1,boost::ref(*session))); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index a75b32cbb5..1021cca1b1 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -36,23 +36,17 @@ using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; -void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); } - -void SessionState::handleOut(AMQFrame& f) { - assert(handler); - handler->out.handle(f); -} - SessionState::SessionState( - SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) : framing::SessionState(ack, timeout_ > 0), - factory(f), handler(&h), id(true), timeout(timeout_), - broker(h.getConnection().broker), - version(h.getConnection().getVersion()), + factory(f), handler(h), id(true), timeout(timeout_), + broker(h->getConnection().broker), + version(h->getConnection().getVersion()), semanticHandler(new SemanticHandler(*this)) { - // TODO aconway 2007-09-20: SessionManager may add plugin - // handlers to the chain. + in.next = semanticHandler.get(); + out.next = &handler->out; + getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); Manageable* parent = broker.GetVhostObject (); @@ -66,8 +60,8 @@ SessionState::SessionState( mgmtObject = management::Session::shared_ptr (new management::Session (this, parent, id.str ())); mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h.getChannel()); + mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h->getChannel()); mgmtObject->set_detachedLifespan (getTimeout()); agent->addObject (mgmtObject); } @@ -76,12 +70,10 @@ SessionState::SessionState( SessionState::~SessionState() { // Remove ID from active session list. - factory.erase(getId()); - + if (factory) + factory->erase(getId()); if (mgmtObject.get () != 0) - { mgmtObject->resourceDestroy (); - } } SessionHandler* SessionState::getHandler() { @@ -101,7 +93,7 @@ Connection& SessionState::getConnection() { void SessionState::detach() { getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); Mutex::ScopedLock l(lock); - handler = 0; + handler = 0; out.next = 0; if (mgmtObject.get() != 0) { mgmtObject->set_attached (0); @@ -112,6 +104,7 @@ void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; + out.next = &handler->out; if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index c8c32a046d..bc1b974eaa 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -58,7 +58,7 @@ class Connection; * themselves have state. */ class SessionState : public framing::SessionState, - public framing::FrameHandler::InOutHandler, + public framing::FrameHandler::Chains, public sys::OutputControl, public management::Manageable { @@ -90,18 +90,15 @@ class SessionState : public framing::SessionState, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); - protected: - void handleIn(framing::AMQFrame&); - void handleOut(framing::AMQFrame&); - - private: - // SessionManager creates sessions. - SessionState(SessionManager&, - SessionHandler& out, + // Normally SessionManager creates sessions. + SessionState(SessionManager*, + SessionHandler* out, uint32_t timeout, uint32_t ackInterval); - SessionManager& factory; + + private: + SessionManager* factory; SessionHandler* handler; framing::Uuid id; uint32_t timeout; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 49270bcfef..bca6c49c13 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,6 +17,7 @@ */ #include "Cluster.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -31,7 +32,70 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; +using broker::SessionState; +namespace { + +// Beginning of inbound chain: send to cluster. +struct ClusterSendHandler : public FrameHandler { + SessionState& session; + Cluster& cluster; + bool busy; + Monitor lock; + + ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} + + void handle(AMQFrame& f) { + Mutex::ScopedLock l(lock); + assert(!busy); + // FIXME aconway 2008-01-29: refcount Sessions. + // session.addRef(); // Keep the session till the message is self delivered. + cluster.send(f, next); // Indirectly send to next via cluster. + + // FIXME aconway 2008-01-29: need to get this blocking out of the loop. + // But cluster needs to agree on order of side-effects on the shared model. + // OK for wiring to block, for messages use queue tokens? + // Both in & out transfers must be orderd per queue. + // May need out-of-order completion. + busy=true; + while (busy) lock.wait(); + } +}; + +// Next in inbound chain, self delivered from cluster. +struct ClusterDeliverHandler : public FrameHandler { + Cluster& cluster; + ClusterSendHandler& sender; + + ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {} + + void handle(AMQFrame& f) { + next->handle(f); + Mutex::ScopedLock l(sender.lock); + sender.busy=false; + sender.lock.notify(); + } +}; + +// FIXME aconway 2008-01-29: IList +void insert(FrameHandler::Chain& c, FrameHandler* h) { + h->next = c.next; + c.next = h; +} + +struct SessionObserver : public broker::SessionManager::Observer { + Cluster& cluster; + SessionObserver(Cluster& c) : cluster(c) {} + + void opened(SessionState& s) { + // FIXME aconway 2008-01-29: IList for memory management. + ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); + ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); + insert(s.in, deliverer); + insert(s.in, sender); + } +}; +} ostream& operator <<(ostream& out, const Cluster& cluster) { return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; @@ -48,10 +112,10 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { } Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) : - FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer cpg(*this), name(name_), - url(url_) + url(url_), + observer(new SessionObserver(*this)) { QPID_LOG(trace, *this << " Joining cluster: " << name_); cpg.join(name); @@ -77,18 +141,19 @@ Cluster::~Cluster() { } } -void Cluster::handle(AMQFrame& frame) { +void Cluster::send(AMQFrame& frame, FrameHandler* next) { QPID_LOG(trace, *this << " SEND: " << frame); - boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling. - Buffer buf(store.get()); + char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling. + Buffer buf(data); frame.encode(buf); - iovec iov = { store.get(), frame.size() }; + buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer. + iovec iov = { data, frame.size()+sizeof(next) }; cpg.mcast(name, &iov, 1); } void Cluster::notify() { AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())); - handle(frame); + send(frame, 0); } size_t Cluster::size() const { @@ -112,15 +177,25 @@ void Cluster::deliver( void* msg, int msg_len) { - Id from(nodeid, pid); - Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; - frame.decode(buf); - QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (frame.getChannel() == 0) - handleClusterFrame(from, frame); - else - next->handle(frame); + try { + Id from(nodeid, pid); + Buffer buf(static_cast<char*>(msg), msg_len); + AMQFrame frame; + frame.decode(buf); + QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); + if (frame.getChannel() == 0) + handleClusterFrame(from, frame); + else if (from == self) { + FrameHandler* next; + buf.getRawData((uint8_t*)&next, sizeof(next)); + next->handle(frame); + } + // FIXME aconway 2008-01-30: apply frames from foreign sessions. + } + catch (const std::exception& e) { + // FIXME aconway 2008-01-30: exception handling. + QPID_LOG(error, "Error handling frame from cluster " << e.what()); + } } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index e9809f2264..b62b2be5f1 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -21,7 +21,6 @@ #include "Cpg.h" -#include "qpid/framing/FrameHandler.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" @@ -39,15 +38,10 @@ namespace qpid { namespace cluster { /** - * Connection to the cluster. Maintains cluster membership - * data. - * - * As FrameHandler, handles frames by sending them to the - * cluster. Frames received from the cluster are sent to the next - * FrameHandler in the chain. + * Connection to the cluster. + * Keeps cluster membership data. */ -class Cluster : public framing::FrameHandler, - private sys::Runnable, private Cpg::Handler +class Cluster : private sys::Runnable, private Cpg::Handler { public: /** Details of a cluster member */ @@ -68,7 +62,7 @@ class Cluster : public framing::FrameHandler, virtual ~Cluster(); // FIXME aconway 2008-01-29: - //framing::HandlerUpdater& getHandlerUpdater() { return sessions; } + intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -87,7 +81,7 @@ class Cluster : public framing::FrameHandler, sys::Duration timeout=sys::TIME_INFINITE) const; /** Send frame to the cluster */ - void handle(framing::AMQFrame&); + void send(framing::AMQFrame&, framing::FrameHandler*); private: typedef Cpg::Id Id; @@ -122,6 +116,7 @@ class Cluster : public framing::FrameHandler, MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; + intrusive_ptr<broker::SessionManager::Observer> observer; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index e6b5f1a0bd..ceafa389b0 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin { cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - // FIXME aconway 2008-02-01: Add observer. + broker->getSessionManager().add(cluster->getObserver()); } } }; |