summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-01 00:38:58 +0000
committerAlan Conway <aconway@apache.org>2007-11-01 00:38:58 +0000
commitd4838b1db929de6d650b7cdf574c04425c01b38d (patch)
treed519ff7d639f6f1bca111bc12930abf1da405e67 /cpp/src/qpid
parentaf6457122a32f1f5a0224fc54f3d0c24377510e3 (diff)
downloadqpid-python-d4838b1db929de6d650b7cdf574c04425c01b38d.tar.gz
Preparation for session thread safety overhaul:
- simplified SessionState, responsibility for protocol states now in Handlers - qpid::RefCounted, qpid::intrusive_ptr reference counting support. - build boost unit tests as single exe, speeds up testing. - fixed leak in AsynchIOAcceptor.cpp git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@590869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/RefCounted.h73
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp9
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp1
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp4
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp21
-rw-r--r--cpp/src/qpid/framing/SessionState.h25
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp2
-rw-r--r--cpp/src/qpid/sys/AtomicCount.h2
9 files changed, 122 insertions, 16 deletions
diff --git a/cpp/src/qpid/RefCounted.h b/cpp/src/qpid/RefCounted.h
new file mode 100644
index 0000000000..790076b463
--- /dev/null
+++ b/cpp/src/qpid/RefCounted.h
@@ -0,0 +1,73 @@
+#ifndef QPID_REFCOUNTED_H
+#define QPID_REFCOUNTED_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AtomicCount.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+
+namespace qpid {
+
+/** Abstract interface for reference counted objects */
+class AbstractRefCounted {
+ public:
+ virtual void addRef() const=0;
+ virtual void release() const=0;
+ protected:
+ virtual ~AbstractRefCounted() {}
+};
+
+/**
+ * Reference-counted virtual base class.
+ */
+class RefCounted : public AbstractRefCounted
+{
+ public:
+ RefCounted() {}
+ virtual void addRef() const { ++count; }
+ virtual void release() const { if (--count==0) released(); }
+
+ protected:
+ virtual ~RefCounted() {};
+ // Copy/assign do not copy refcounts.
+ RefCounted(const RefCounted&) : AbstractRefCounted() {}
+ RefCounted& operator=(const RefCounted&) { return *this; }
+ virtual void released() const { delete this; }
+
+ private:
+ mutable sys::AtomicCount count;
+};
+
+using boost::intrusive_ptr;
+
+} // namespace qpid
+
+// intrusive_ptr support.
+namespace boost {
+void intrusive_ptr_add_ref(const qpid::AbstractRefCounted* p) { p->addRef(); }
+void intrusive_ptr_release(const qpid::AbstractRefCounted* p) { p->release(); }
+}
+
+
+#endif /*!QPID_REFCOUNTED_H*/
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 0dafcba7bd..f72c52c809 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -38,8 +38,7 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch)
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false),
- resuming(false) {}
+ ignoring(false) {}
SessionHandler::~SessionHandler() {}
@@ -117,8 +116,7 @@ void SessionHandler::resume(const Uuid& id) {
assertClosed("resume");
session = connection.broker.getSessionManager().resume(id);
session->attach(*this);
- resuming=true;
- SequenceNumber seq = session->sendingAck();
+ SequenceNumber seq = session->resuming();
peerSession.attached(session->getId(), session->getTimeout());
proxy.getSession().ack(seq, SequenceNumberSet());
}
@@ -171,8 +169,7 @@ void SessionHandler::ack(uint32_t cumulativeSeenMark,
const SequenceNumberSet& /*seenFrameSet*/)
{
assertAttached("ack");
- if (resuming) {
- resuming=false;
+ if (session->getState() == SessionState::RESUMING) {
session->receivedAck(cumulativeSeenMark);
framing::SessionState::Replay replay=session->replay();
std::for_each(replay.begin(), replay.end(),
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 800b886bbf..08584ecd47 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -94,7 +94,6 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
framing::AMQP_ClientProxy proxy;
framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
- bool resuming;
std::auto_ptr<SessionState> session;
sys::Semaphore suspension;
};
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index 5bdc572491..f12ebc6db1 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -56,6 +56,7 @@ std::auto_ptr<SessionState> SessionManager::open(
void SessionManager::suspend(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
active.erase(session->getId());
+ session->suspend();
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
suspended.push_back(session.release()); // In expiry order
eraseExpired();
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 30df574716..f7f0f52dba 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -52,6 +52,7 @@ inline void SessionCore::invariant() const {
break;
case RESUMING:
assert(session);
+ assert(session->getState() == SessionState::RESUMING);
assert(code==REPLY_SUCCESS);
assert(connection);
assert(channel.get());
@@ -142,6 +143,7 @@ void SessionCore::doSuspend(int code, const std::string& text) {
if (state != CLOSED) {
invariant();
detach(code, text);
+ session->suspend();
setState(SUSPENDED);
}
}
@@ -200,7 +202,7 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
if (state==OPEN)
doSuspend(REPLY_SUCCESS, OK);
check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed."));
- SequenceNumber sendAck=session->sendingAck();
+ SequenceNumber sendAck=session->resuming();
attaching(c);
proxy.resume(getId());
waitFor(OPEN);
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
index 7e905bdf63..8056f4a523 100644
--- a/cpp/src/qpid/framing/SessionState.cpp
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -33,6 +33,7 @@ namespace qpid {
namespace framing {
SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
+ state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -44,6 +45,7 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
{}
SessionState::SessionState(const Uuid& uuid) :
+ state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -63,6 +65,10 @@ bool isSessionCommand(const AMQFrame& f) {
boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
if (isSessionCommand(f))
return boost::none;
+ if (state==RESUMING)
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
+ assert(state = ATTACHED);
++lastReceived;
QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
if (ackInterval && lastReceived == sendAckAt)
@@ -79,6 +85,7 @@ bool SessionState::sent(const AMQFrame& f) {
++lastSent;
QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
return ackInterval &&
+ (state!=RESUMING) &&
(lastSent == solicitAckAt) &&
sendingSolicit();
}
@@ -90,6 +97,8 @@ SessionState::Replay SessionState::replay() {
}
void SessionState::receivedAck(SequenceNumber acked) {
+ if (state==RESUMING) state=ATTACHED;
+ assert(state==ATTACHED);
if (lastSent < acked)
throw InvalidArgumentException("Invalid sequence number in ack");
size_t keep = lastSent - acked;
@@ -104,10 +113,22 @@ SequenceNumber SessionState::sendingAck() {
}
bool SessionState::sendingSolicit() {
+ assert(state == ATTACHED);
if (ackSolicited)
return false;
solicitAckAt = lastSent + ackInterval;
return ackInterval != 0;
}
+SequenceNumber SessionState::resuming() {
+ if (!resumable)
+ throw InternalErrorException("Session is not resumable");
+ state = RESUMING;
+ return sendingAck();
+}
+
+void SessionState::suspend() {
+ state = SUSPENDED;
+}
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h
index 5034de4e94..361c960db1 100644
--- a/cpp/src/qpid/framing/SessionState.h
+++ b/cpp/src/qpid/framing/SessionState.h
@@ -35,10 +35,7 @@ namespace framing {
/**
* Session state common to client and broker.
- *
- * Stores data needed to resume a session: replay frames, implements
- * session ack/resume protcools. Stores handler chains for the session,
- * handlers may themselves store state.
+ * Stores replay frames, implements session ack/resume protcools.
*
* A SessionState is always associated with an _open_ session (attached or
* suspended) it is destroyed when the session is closed.
@@ -49,6 +46,13 @@ class SessionState
public:
typedef std::vector<AMQFrame> Replay;
+ /** States of a session. */
+ enum State {
+ SUSPENDED, ///< Suspended, detached from any channel.
+ RESUMING, ///< Resuming: waiting for initial ack from peer.
+ ATTACHED ///< Attached to channel and operating normally.
+ };
+
/**
*Create a newly opened active session.
*@param ackInterval send/solicit an ack whenever N unacked frames
@@ -56,8 +60,7 @@ class SessionState
*
* N=0 disables voluntary send/solict ack.
*/
- SessionState(uint32_t ackInterval,
- const framing::Uuid& id=framing::Uuid(true));
+ SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true));
/**
* Create a non-resumable session. Does not store session frames,
@@ -66,6 +69,7 @@ class SessionState
SessionState(const framing::Uuid& id=framing::Uuid(true));
const framing::Uuid& getId() const { return id; }
+ State getState() const { return state; }
/** Received incoming L3 frame.
* @return SequenceNumber if an ack should be sent, empty otherwise.
@@ -88,6 +92,13 @@ class SessionState
*/
Replay replay();
+ /** Suspend the session. */
+ void suspend();
+
+ /** Start resume protocol for the session.
+ *@returns sequence number to ack immediately. */
+ SequenceNumber resuming();
+
/** About to send an unscheduled ack, e.g. to respond to a solicit-ack.
*
* Note: when received() returns a sequence number this function
@@ -104,7 +115,9 @@ class SessionState
bool sendingSolicit();
+ State state;
framing::Uuid id;
+
Unacked unackedOut;
SequenceNumber lastReceived;
SequenceNumber lastSent;
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index eccfb1465e..bdf3e3b8d3 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -173,7 +173,7 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
d.run();
// Now wait for n-1 io threads to exit
- for (int i=0; i>numIOThreads-1; ++i) {
+ for (int i=0; i<numIOThreads-1; ++i) {
t[i].join();
}
}
diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h
index 63670cbf00..b542e37039 100644
--- a/cpp/src/qpid/sys/AtomicCount.h
+++ b/cpp/src/qpid/sys/AtomicCount.h
@@ -28,7 +28,7 @@ namespace sys {
/**
* Atomic counter.
*/
-class AtomicCount : boost::noncopyable {
+class AtomicCount {
public:
typedef ScopedDecrement<AtomicCount> ScopedDecrement;
typedef ScopedIncrement<AtomicCount> ScopedIncrement;