summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-04 19:52:49 +0000
committerAlan Conway <aconway@apache.org>2008-11-04 19:52:49 +0000
commiteda249ff22edb3726243da81ff48c82e4d88e872 (patch)
tree0939d790e6a1b0d86993c9c3804c1adaa369aeb8 /cpp/src/qpid/cluster/Connection.cpp
parent5d2471636928eff8b8031237c54348db0d5c388d (diff)
downloadqpid-python-eda249ff22edb3726243da81ff48c82e4d88e872.tar.gz
constants.rb: generate type code constants for AMQP types. Useful with Array.
framing/Array: - added some std:::vector like functions & typedefs. - use TypeCode enums, human readable ostream << operator. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711365 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp52
1 files changed, 35 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 604df9dde6..ada26ab2fb 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxPublish.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -35,6 +36,14 @@
#include <boost/current_function.hpp>
+// FIXME aconway 2008-11-03:
+//
+// Disproportionate amount of code here is dedicated to receiving a
+// brain-dump when joining a cluster and building initial
+// state. Should be separated out into its own classes.
+//
+
+
namespace qpid {
namespace cluster {
@@ -180,10 +189,16 @@ void Connection::deliverBuffer(Buffer& buf) {
delivered(mcastDecoder.frame);
}
+broker::SessionState& Connection::sessionState() {
+ return *connection.getChannel(currentChannel).getSession();
+}
+
+broker::SemanticState& Connection::semanticState() {
+ return sessionState().getSemanticState();
+}
+
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) {
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- broker::SemanticState::ConsumerImpl& c = s->getConsumer(name);
+ broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
}
@@ -197,9 +212,7 @@ void Connection::sessionState(
const SequenceSet& unknownCompleted,
const SequenceSet& receivedIncomplete)
{
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- s->setState(
+ sessionState().setState(
replayStart,
sendCommandPoint,
sentIncomplete,
@@ -207,7 +220,7 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state dump for " << s->getId());
+ QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
@@ -234,6 +247,15 @@ bool Connection::isDumped() const {
return self.first == cluster.getId() && self.second == 0;
}
+broker::QueuedMessage Connection::getDumpMessage() {
+ // Get a message from the DUMP queue.
+ broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
+ if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue"));
+ broker::QueuedMessage m = dumpQueue->get();
+ if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
+ return m;
+}
+
void Connection::deliveryRecord(const string& qname,
const SequenceNumber& position,
const string& tag,
@@ -245,15 +267,14 @@ void Connection::deliveryRecord(const string& qname,
bool ended,
bool windowing)
{
- broker::QueuedMessage m;
broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue"));
-
+ broker::QueuedMessage m;
if (!ended) { // Has a message
- if (acquired) // Message at front of dump queue
+ if (acquired) { // Message at front of dump queue
+ broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
m = dumpQueue->get();
+ }
else // Message at original position in original queue
m = queue->find(position);
if (!m.payload)
@@ -266,10 +287,7 @@ void Connection::deliveryRecord(const string& qname,
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- assert(s);
- s->record(dr);
+ semanticState().record(dr);
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -286,7 +304,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
-
+
}} // namespace qpid::cluster