diff options
author | Alan Conway <aconway@apache.org> | 2009-06-16 21:21:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-06-16 21:21:09 +0000 |
commit | 80d65b38008d9b7f31c825508819f9600d63b63c (patch) | |
tree | 316862bff35f1cae6f0d1152dcf4a6e3b0f967ed /cpp/src/qpid/cluster/Connection.cpp | |
parent | f5e98a6dfb8c4defe22755340f440e6f16c2559a (diff) | |
download | qpid-python-80d65b38008d9b7f31c825508819f9600d63b63c.tar.gz |
Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection
holding only the OutputTasks that are potentially active. Tasks are
droped from the list as soon as they return false, and added back when
they may have output.
Inlined frequently-used SequenceNumber functions.
Replace std::list in QueueListeners with std::vector.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@785408 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index afecbd50e5..e7dac82159 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -245,10 +245,13 @@ broker::SemanticState& Connection::semanticState() { return sessionState().getSemanticState(); } -void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) { +void Connection::consumerState( + const string& name, bool blocked, bool notifyEnabled, bool isInListener) +{ broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); + if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this()); } void Connection::sessionState( @@ -270,6 +273,17 @@ void Connection::sessionState( unknownCompleted, receivedIncomplete); QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); + // The output tasks will be added later in the update process. + connection.getOutputTasks().removeAll(); +} + +void Connection::outputTask(uint16_t channel, const std::string& name) { + broker::SessionState* session = connection.getChannel(channel).getSession(); + if (!session) + throw Exception(QPID_MSG(cluster << " channel not attached " << *this + << "[" << channel << "] ")); + OutputTask* task = &session->getSemanticState().find(name); + connection.getOutputTasks().addOutputTask(task); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { |