summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ConnectionCodec.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-24 17:34:08 +0000
committerAlan Conway <aconway@apache.org>2008-09-24 17:34:08 +0000
commita2a56cf9a7483e165fb579d0b519b284d02009e3 (patch)
tree11264fc87ea6e54c54b476e245ad4ee9c83faaeb /cpp/src/qpid/cluster/ConnectionCodec.cpp
parent30be110b6914959a1eaee4803ff8c1c9938db7bb (diff)
downloadqpid-python-a2a56cf9a7483e165fb579d0b519b284d02009e3.tar.gz
Cluster replicates session command sequence state and consumers to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698666 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/ConnectionCodec.cpp')
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp38
1 files changed, 17 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index accf83ebc7..1458a87923 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -23,18 +23,23 @@
#include "Cluster.h"
#include "ProxyInputHandler.h"
#include "qpid/broker/Connection.h"
+#include "qpid/framing/ConnectionCloseBody.h"
+#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include <stdexcept>
+#include <boost/utility/in_place_factory.hpp>
namespace qpid {
namespace cluster {
+using namespace framing;
+
sys::ConnectionCodec*
-ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
- if (v == framing::ProtocolVersion(0, 10))
+ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+ if (v == ProtocolVersion(0, 10))
return new ConnectionCodec(out, id, cluster, false);
- else if (v == framing::ProtocolVersion(0x80 + 0, 0x80 + 10))
+ else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
return new ConnectionCodec(out, id, cluster, true); // Catch-up connection
return 0;
}
@@ -47,7 +52,8 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id)
ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp)
: codec(out, id, false),
- interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp))
+ interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp)),
+ id(interceptor->getId())
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
@@ -56,28 +62,18 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id,
ConnectionCodec::~ConnectionCodec() {}
-// ConnectionCodec functions delegate to the codecOutput
size_t ConnectionCodec::decode(const char* buffer, size_t size) {
- if (interceptor->isShadow())
- throw Exception(QPID_MSG("Unexpected decode for shadow connection " << *interceptor));
- else if (interceptor->isCatchUp()) {
- size_t ret = codec.decode(buffer, size);
- if (interceptor->isShadow()) {
- // Promoted to shadow, close the codec.
- // FIXME aconway 2008-09-19: can we close cleanly?
- // codec.close();
- throw Exception("Close codec");
- }
- return ret;
- }
- else
- return interceptor->decode(buffer, size);
+ return interceptor->decode(buffer, size);
}
+bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
+
size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
+
bool ConnectionCodec::canEncode() { return codec.canEncode(); }
+
void ConnectionCodec::closed() { codec.closed(); }
-bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
-framing::ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); }
+
+ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); }
}} // namespace qpid::cluster