diff options
author | Alan Conway <aconway@apache.org> | 2008-06-18 17:53:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-06-18 17:53:30 +0000 |
commit | 9bf82c2c8c45a5228643a285f8db0b1061a69ad9 (patch) | |
tree | d38be99fcb793712c2a2b5fb56dcbbb8294ff818 /cpp/src/qpid | |
parent | 02757b560356e0ddb090fbe103e0b65db6dbd3b3 (diff) | |
download | qpid-python-9bf82c2c8c45a5228643a285f8db0b1061a69ad9.tar.gz |
Bring cluster code up to date.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@669236 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 63 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClassifierHandler.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Handler.h | 27 | ||||
-rw-r--r-- | cpp/src/qpid/framing/HeaderProperties.h | 2 |
11 files changed, 86 insertions, 75 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 7fc2b6c4f3..b058978ccf 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -33,9 +33,6 @@ using namespace qpid::broker; using boost::dynamic_pointer_cast; using boost::intrusive_ptr; -static const uint8_t BASIC = 1; -static const uint8_t MESSAGE = 2; - RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, DtxManager& _dtxMgr, uint64_t _stagingThreshold) : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index bdd8edac87..a4a40a03e8 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -288,7 +288,6 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { - //TODO: remove the now redundant checks (channel.flow & basic|message.qos removed): blocked = !(filter(msg) && checkCredit(msg) && parent->flowActive && (!ackExpected || parent->checkPrefetch(msg))); return !blocked; } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index bf3a7756b5..1310e6c51a 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -54,8 +54,7 @@ class SessionContext; * SemanticState holds the L3 and L4 state of an open session, whether * attached to a channel or suspended. */ -class SemanticState : public framing::FrameHandler::Chains, - public sys::OutputTask, +class SemanticState : public sys::OutputTask, private boost::noncopyable { class ConsumerImpl : public Consumer, public sys::OutputTask diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 2f09c6b5ac..c752f6315b 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -71,7 +71,7 @@ void SessionHandler::setState(const std::string& name, bool force) { session = connection.broker.getSessionManager().attach(*this, id, force); } -FrameHandler* SessionHandler::getInHandler() { return session.get(); } +FrameHandler* SessionHandler::getInHandler() { return session.get() ? &session->in : 0; } qpid::SessionState* SessionHandler::getState() { return session.get(); } void SessionHandler::readyToSend() { diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index dada7567f9..d7089424a5 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -53,7 +53,11 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) + enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), + inLastHandler(*this), + outLastHandler(*this), + inChain(inLastHandler), + outChain(outLastHandler) { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { @@ -102,20 +106,20 @@ void SessionState::detach() { handler = 0; if (mgmtObject.get() != 0) mgmtObject->set_attached (0); - } +} void SessionState::attach(SessionHandler& h) { // activateOutput can be called in a different thread, lock to protect attached status - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); QPID_LOG(debug, getId() << ": attached on broker."); - handler = &h; - if (mgmtObject.get() != 0) - { - mgmtObject->set_attached (1); - mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h.getChannel()); - } + handler = &h; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); } +} void SessionState::activateOutput() { // activateOutput can be called in a different thread, lock to protect attached status @@ -137,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, switch (methodId) { - case management::Session::METHOD_DETACH : + case management::Session::METHOD_DETACH : if (handler != 0) { handler->sendDetach(); @@ -145,18 +149,18 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; - case management::Session::METHOD_CLOSE : + case management::Session::METHOD_CLOSE : /* - if (handler != 0) - { - handler->getConnection().closeChannel(handler->getChannel()); - } - status = Manageable::STATUS_OK; - break; + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; */ - case management::Session::METHOD_SOLICITACK : - case management::Session::METHOD_RESETLIFESPAN : + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : status = Manageable::STATUS_NOT_IMPLEMENTED; break; } @@ -218,10 +222,12 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg) receiverCompleted(msg->getCommandId()); if (msg->requiresAccept()) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); - } +} -void SessionState::handle(AMQFrame& frame) -{ +void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); } +void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); } + +void SessionState::handleInLast(AMQFrame& frame) { SequenceNumber commandId = receiverGetCurrent(); try { //TODO: make command handling more uniform, regardless of whether @@ -252,6 +258,11 @@ void SessionState::handle(AMQFrame& frame) } } +void SessionState::handleOutLast(AMQFrame& frame) { + assert(handler); + handler->out(frame); +} + DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { uint32_t maxFrameSize = getConnection().getFrameMax(); @@ -267,7 +278,7 @@ void SessionState::sendCompletion() { handler->sendCompletion(); } void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++) - semanticState.completed(i->first(), i->last()); + semanticState.completed(i->first(), i->last()); } void SessionState::readyToSend() { @@ -280,4 +291,8 @@ void SessionState::readyToSend() { Broker& SessionState::getBroker() { return broker; } +framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; } + +framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; } + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 7b70789161..f2774dadd3 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -62,10 +62,10 @@ class SessionManager; * themselves have state. */ class SessionState : public qpid::SessionState, - public SessionContext, - public DeliveryAdapter, + public SessionContext, + public DeliveryAdapter, public management::Manageable, - public framing::FrameHandler + public framing::FrameHandler::InOutHandler { public: SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&); @@ -87,8 +87,6 @@ class SessionState : public qpid::SessionState, /** OutputControl **/ void activateOutput(); - void handle(framing::AMQFrame& frame); - void senderCompleted(const framing::SequenceSet& ranges); void sendCompletion(); @@ -99,32 +97,43 @@ class SessionState : public qpid::SessionState, // Manageable entry points management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args); + ManagementMethod (uint32_t methodId, management::Args& args); void readyToSend(); + framing::FrameHandler::Chain& getInChain(); + framing::FrameHandler::Chain& getOutChain(); + private: + void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); + void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); + void enqueued(boost::intrusive_ptr<Message> msg); + + void handleIn(framing::AMQFrame& frame); + void handleOut(framing::AMQFrame& frame); + + // End of the input & output chains. + void handleInLast(framing::AMQFrame& frame); + void handleOutLast(framing::AMQFrame& frame); + Broker& broker; SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. sys::Mutex lock; bool ignoring; std::string name; - SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; IncompleteMessageList incomplete; - IncompleteMessageList::CompletionListener enqueuedOp; - management::Session::shared_ptr mgmtObject; - void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); - void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); - void enqueued(boost::intrusive_ptr<Message> msg); + framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler; + framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler; + framing::FrameHandler::Chain inChain, outChain; - friend class SessionManager; + friend class SessionManager; }; diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp index b241ee8e36..b78f795d20 100644 --- a/cpp/src/qpid/cluster/ClassifierHandler.cpp +++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp @@ -41,11 +41,11 @@ struct ClassifierHandler::Visitor : public FrameDefaultVisitor { using framing::FrameDefaultVisitor::visit; using framing::FrameDefaultVisitor::defaultVisit; - FrameHandler::Chain chosen; + FrameHandler* chosen; AMQFrame& frame; ClassifierHandler& classifier; }; -void ClassifierHandler::handle(AMQFrame& f) { Visitor(f, *this).chosen(f); } +void ClassifierHandler::handle(AMQFrame& f) { Visitor(f, *this).chosen->handle(f); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 59353d7637..3007e9b1ab 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -71,28 +71,23 @@ struct ClusterDeliverHandler : public FrameHandler { void handle(AMQFrame& f) { next->handle(f); - Mutex::ScopedLock l(senderLock); - senderBusy=false; - senderLock.notify(); + // FIXME aconway 2008-06-16: solve overtaking problem - async completion of commands. + // Mutex::ScopedLock l(lock); + // senderBusy=false; + // senderLock.notify(); } }; -// FIXME aconway 2008-01-29: IList -void insert(FrameHandler::Chain& c, FrameHandler* h) { - h->next = c.next; - c.next = h; -} - struct SessionObserver : public broker::SessionManager::Observer { Cluster& cluster; SessionObserver(Cluster& c) : cluster(c) {} void opened(SessionState& s) { - // FIXME aconway 2008-01-29: IList for memory management. + // FIXME aconway 2008-06-16: clean up chaining and observers. ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); - insert(s.in, deliverer); - insert(s.in, sender); + s.getInChain().insert(deliverer); + s.getOutChain().insert(sender); } }; } diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 0ea3953175..ceafa389b0 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin { cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getPreviewSessionManager().add(cluster->getObserver()); + broker->getSessionManager().add(cluster->getObserver()); } } }; diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index b93869be85..5e3d48ac88 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -46,22 +46,21 @@ struct Handler { /** Pointer to next handler in a linked list. */ Handler<T>* next; - /** A Chain is a handler that forwards to a modifiable - * linked list of handlers. + /** A Chain is a handler holding a linked list of sub-handlers. + * Chain::next is invoked after the full, it is not itself part of the chain. + * Handlers inserted into the chain are deleted by the Chain dtor. */ - struct Chain : public Handler<T> { - Chain(Handler<T>* first=0) : Handler(first) {} - void operator=(Handler<T>* h) { next = h; } - void handle(T t) { next->handle(t); } - // TODO aconway 2007-08-29: chain modifier ops here. - }; + class Chain : public Handler<T> { + public: + Chain(Handler<T>& next_) : Handler(&next_), first(&next_) {} + ~Chain() { while (first != next) pop(); } + void handle(T t) { first->handle(t); } + void insert(Handler<T>* h) { h->next = first; first = h; } + bool empty() { return first == next; } - /** In/out pair of handler chains. */ - struct Chains { - Chains(Handler<T>* in_=0, Handler<T>* out_=0) : in(in_), out(out_) {} - void reset(Handler<T>* in_=0, Handler<T>* out_=0) { in = in_; out = out_; } - Chain in; - Chain out; + private: + void pop() { Handler<T>* p=first; first=first->next; delete p; } + Handler<T>* first; }; /** Adapt any void(T) functor as a Handler. diff --git a/cpp/src/qpid/framing/HeaderProperties.h b/cpp/src/qpid/framing/HeaderProperties.h index 0c805922e8..d66c1d00d6 100644 --- a/cpp/src/qpid/framing/HeaderProperties.h +++ b/cpp/src/qpid/framing/HeaderProperties.h @@ -27,8 +27,6 @@ namespace qpid { namespace framing { - enum header_classes{BASIC = 60}; - class HeaderProperties { |