diff options
author | Alan Conway <aconway@apache.org> | 2007-11-01 00:38:58 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-11-01 00:38:58 +0000 |
commit | d4838b1db929de6d650b7cdf574c04425c01b38d (patch) | |
tree | d519ff7d639f6f1bca111bc12930abf1da405e67 /cpp/src/qpid | |
parent | af6457122a32f1f5a0224fc54f3d0c24377510e3 (diff) | |
download | qpid-python-d4838b1db929de6d650b7cdf574c04425c01b38d.tar.gz |
Preparation for session thread safety overhaul:
- simplified SessionState, responsibility for protocol states now in Handlers
- qpid::RefCounted, qpid::intrusive_ptr reference counting support.
- build boost unit tests as single exe, speeds up testing.
- fixed leak in AsynchIOAcceptor.cpp
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@590869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/RefCounted.h | 73 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionManager.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SessionState.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SessionState.h | 25 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AtomicCount.h | 2 |
9 files changed, 122 insertions, 16 deletions
diff --git a/cpp/src/qpid/RefCounted.h b/cpp/src/qpid/RefCounted.h new file mode 100644 index 0000000000..790076b463 --- /dev/null +++ b/cpp/src/qpid/RefCounted.h @@ -0,0 +1,73 @@ +#ifndef QPID_REFCOUNTED_H +#define QPID_REFCOUNTED_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AtomicCount.h" + +#include <boost/intrusive_ptr.hpp> + + +namespace qpid { + +/** Abstract interface for reference counted objects */ +class AbstractRefCounted { + public: + virtual void addRef() const=0; + virtual void release() const=0; + protected: + virtual ~AbstractRefCounted() {} +}; + +/** + * Reference-counted virtual base class. + */ +class RefCounted : public AbstractRefCounted +{ + public: + RefCounted() {} + virtual void addRef() const { ++count; } + virtual void release() const { if (--count==0) released(); } + + protected: + virtual ~RefCounted() {}; + // Copy/assign do not copy refcounts. + RefCounted(const RefCounted&) : AbstractRefCounted() {} + RefCounted& operator=(const RefCounted&) { return *this; } + virtual void released() const { delete this; } + + private: + mutable sys::AtomicCount count; +}; + +using boost::intrusive_ptr; + +} // namespace qpid + +// intrusive_ptr support. +namespace boost { +void intrusive_ptr_add_ref(const qpid::AbstractRefCounted* p) { p->addRef(); } +void intrusive_ptr_release(const qpid::AbstractRefCounted* p) { p->release(); } +} + + +#endif /*!QPID_REFCOUNTED_H*/ diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 0dafcba7bd..f72c52c809 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -38,8 +38,7 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch) connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. - ignoring(false), - resuming(false) {} + ignoring(false) {} SessionHandler::~SessionHandler() {} @@ -117,8 +116,7 @@ void SessionHandler::resume(const Uuid& id) { assertClosed("resume"); session = connection.broker.getSessionManager().resume(id); session->attach(*this); - resuming=true; - SequenceNumber seq = session->sendingAck(); + SequenceNumber seq = session->resuming(); peerSession.attached(session->getId(), session->getTimeout()); proxy.getSession().ack(seq, SequenceNumberSet()); } @@ -171,8 +169,7 @@ void SessionHandler::ack(uint32_t cumulativeSeenMark, const SequenceNumberSet& /*seenFrameSet*/) { assertAttached("ack"); - if (resuming) { - resuming=false; + if (session->getState() == SessionState::RESUMING) { session->receivedAck(cumulativeSeenMark); framing::SessionState::Replay replay=session->replay(); std::for_each(replay.begin(), replay.end(), diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 800b886bbf..08584ecd47 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -94,7 +94,6 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, framing::AMQP_ClientProxy proxy; framing::AMQP_ClientProxy::Session peerSession; bool ignoring; - bool resuming; std::auto_ptr<SessionState> session; sys::Semaphore suspension; }; diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 5bdc572491..f12ebc6db1 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -56,6 +56,7 @@ std::auto_ptr<SessionState> SessionManager::open( void SessionManager::suspend(std::auto_ptr<SessionState> session) { Mutex::ScopedLock l(lock); active.erase(session->getId()); + session->suspend(); session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); suspended.push_back(session.release()); // In expiry order eraseExpired(); diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 30df574716..f7f0f52dba 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -52,6 +52,7 @@ inline void SessionCore::invariant() const { break; case RESUMING: assert(session); + assert(session->getState() == SessionState::RESUMING); assert(code==REPLY_SUCCESS); assert(connection); assert(channel.get()); @@ -142,6 +143,7 @@ void SessionCore::doSuspend(int code, const std::string& text) { if (state != CLOSED) { invariant(); detach(code, text); + session->suspend(); setState(SUSPENDED); } } @@ -200,7 +202,7 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) { if (state==OPEN) doSuspend(REPLY_SUCCESS, OK); check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed.")); - SequenceNumber sendAck=session->sendingAck(); + SequenceNumber sendAck=session->resuming(); attaching(c); proxy.resume(getId()); waitFor(OPEN); diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp index 7e905bdf63..8056f4a523 100644 --- a/cpp/src/qpid/framing/SessionState.cpp +++ b/cpp/src/qpid/framing/SessionState.cpp @@ -33,6 +33,7 @@ namespace qpid { namespace framing { SessionState::SessionState(uint32_t ack, const Uuid& uuid) : + state(ATTACHED), id(uuid), lastReceived(-1), lastSent(-1), @@ -44,6 +45,7 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) : {} SessionState::SessionState(const Uuid& uuid) : + state(ATTACHED), id(uuid), lastReceived(-1), lastSent(-1), @@ -63,6 +65,10 @@ bool isSessionCommand(const AMQFrame& f) { boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { if (isSessionCommand(f)) return boost::none; + if (state==RESUMING) + throw CommandInvalidException( + QPID_MSG("Invalid frame: Resuming session, expected session-ack")); + assert(state = ATTACHED); ++lastReceived; QPID_LOG(trace, "Recv # "<< lastReceived << " " << id); if (ackInterval && lastReceived == sendAckAt) @@ -79,6 +85,7 @@ bool SessionState::sent(const AMQFrame& f) { ++lastSent; QPID_LOG(trace, "Sent # "<< lastSent << " " << id); return ackInterval && + (state!=RESUMING) && (lastSent == solicitAckAt) && sendingSolicit(); } @@ -90,6 +97,8 @@ SessionState::Replay SessionState::replay() { } void SessionState::receivedAck(SequenceNumber acked) { + if (state==RESUMING) state=ATTACHED; + assert(state==ATTACHED); if (lastSent < acked) throw InvalidArgumentException("Invalid sequence number in ack"); size_t keep = lastSent - acked; @@ -104,10 +113,22 @@ SequenceNumber SessionState::sendingAck() { } bool SessionState::sendingSolicit() { + assert(state == ATTACHED); if (ackSolicited) return false; solicitAckAt = lastSent + ackInterval; return ackInterval != 0; } +SequenceNumber SessionState::resuming() { + if (!resumable) + throw InternalErrorException("Session is not resumable"); + state = RESUMING; + return sendingAck(); +} + +void SessionState::suspend() { + state = SUSPENDED; +} + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h index 5034de4e94..361c960db1 100644 --- a/cpp/src/qpid/framing/SessionState.h +++ b/cpp/src/qpid/framing/SessionState.h @@ -35,10 +35,7 @@ namespace framing { /** * Session state common to client and broker. - * - * Stores data needed to resume a session: replay frames, implements - * session ack/resume protcools. Stores handler chains for the session, - * handlers may themselves store state. + * Stores replay frames, implements session ack/resume protcools. * * A SessionState is always associated with an _open_ session (attached or * suspended) it is destroyed when the session is closed. @@ -49,6 +46,13 @@ class SessionState public: typedef std::vector<AMQFrame> Replay; + /** States of a session. */ + enum State { + SUSPENDED, ///< Suspended, detached from any channel. + RESUMING, ///< Resuming: waiting for initial ack from peer. + ATTACHED ///< Attached to channel and operating normally. + }; + /** *Create a newly opened active session. *@param ackInterval send/solicit an ack whenever N unacked frames @@ -56,8 +60,7 @@ class SessionState * * N=0 disables voluntary send/solict ack. */ - SessionState(uint32_t ackInterval, - const framing::Uuid& id=framing::Uuid(true)); + SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true)); /** * Create a non-resumable session. Does not store session frames, @@ -66,6 +69,7 @@ class SessionState SessionState(const framing::Uuid& id=framing::Uuid(true)); const framing::Uuid& getId() const { return id; } + State getState() const { return state; } /** Received incoming L3 frame. * @return SequenceNumber if an ack should be sent, empty otherwise. @@ -88,6 +92,13 @@ class SessionState */ Replay replay(); + /** Suspend the session. */ + void suspend(); + + /** Start resume protocol for the session. + *@returns sequence number to ack immediately. */ + SequenceNumber resuming(); + /** About to send an unscheduled ack, e.g. to respond to a solicit-ack. * * Note: when received() returns a sequence number this function @@ -104,7 +115,9 @@ class SessionState bool sendingSolicit(); + State state; framing::Uuid id; + Unacked unackedOut; SequenceNumber lastReceived; SequenceNumber lastSent; diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index eccfb1465e..bdf3e3b8d3 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -173,7 +173,7 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { d.run(); // Now wait for n-1 io threads to exit - for (int i=0; i>numIOThreads-1; ++i) { + for (int i=0; i<numIOThreads-1; ++i) { t[i].join(); } } diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h index 63670cbf00..b542e37039 100644 --- a/cpp/src/qpid/sys/AtomicCount.h +++ b/cpp/src/qpid/sys/AtomicCount.h @@ -28,7 +28,7 @@ namespace sys { /** * Atomic counter. */ -class AtomicCount : boost::noncopyable { +class AtomicCount { public: typedef ScopedDecrement<AtomicCount> ScopedDecrement; typedef ScopedIncrement<AtomicCount> ScopedIncrement; |