summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-22 20:14:15 +0000
committerGordon Sim <gsim@apache.org>2008-04-22 20:14:15 +0000
commitdc6dc660ab0de77d6df8f154017aecdcee3b0577 (patch)
tree78ed77a62285ffe9dba6c148e6328c7ab24b64f4
parent74cae9fc609b5964d6e49f405dca07b2b1df592b (diff)
downloadqpid-python-dc6dc660ab0de77d6df8f154017aecdcee3b0577.tar.gz
Moved federation to final 0-10 codepath
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@650635 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp20
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp29
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp7
-rwxr-xr-xqpid/cpp/src/tests/federation.py12
11 files changed, 64 insertions, 51 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index c5315ccf4c..03e553f180 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -27,12 +27,21 @@ namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id)
- : frameQueueClosed(false), output(o), connection(this, broker, id),
- identifier(id), initialized(false) {}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
+ : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+ identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
+ if (isClient && !initialized) {
+ //read in protocol header
+ framing::ProtocolInitiation pi;
+ if (pi.decode(in)) {
+ //TODO: check the version is correct
+ QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")");
+ }
+ initialized = true;
+ }
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
@@ -44,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
bool Connection::canEncode() {
if (!frameQueueClosed) connection.doOutput();
Mutex::ScopedLock l(frameQueueLock);
- return !initialized || !frameQueue.empty();
+ return (!isClient && !initialized) || !frameQueue.empty();
}
bool Connection::isClosed() const {
@@ -55,10 +64,11 @@ bool Connection::isClosed() const {
size_t Connection::encode(const char* buffer, size_t size) {
Mutex::ScopedLock l(frameQueueLock);
framing::Buffer out(const_cast<char*>(buffer), size);
- if (!initialized) {
+ if (!isClient && !initialized) {
framing::ProtocolInitiation pi(getVersion());
pi.encode(out);
initialized = true;
+ QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
frameQueue.front().encode(out);
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
index e4672be722..4369d401bd 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
@@ -43,9 +43,10 @@ class Connection : public sys::ConnectionCodec,
broker::Connection connection; // FIXME aconway 2008-03-18:
std::string identifier;
bool initialized;
+ bool isClient;
public:
- Connection(sys::OutputControl&, broker::Broker&, const std::string& id);
+ Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool isClosed() const;
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 32819380de..598f428ad2 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -34,7 +34,7 @@ namespace broker {
Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
args(_args), channel(id, &(c.getOutput())), peer(channel),
mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
- connection(c), listener(l)
+ connection(c), listener(l), name(Uuid(true).str())
{
management::ManagementAgent::getAgent()->addObject(mgmtObject);
}
@@ -46,24 +46,24 @@ Bridge::~Bridge()
void Bridge::create()
{
- framing::AMQP_ServerProxy::Session session(channel);
- session.open(0);
+ framing::AMQP_ServerProxy::Session010 session(channel);
+ session.attach(name, false);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
if (args.i_src_is_queue) {
- peer.getMessage().subscribe(0, args.i_src, args.i_dest, false, 0, 0, false, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer.getMessage010().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable());
+ peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
- peer.getQueue().declare(0, queue, "", false, false, true, true, FieldTable());
- peer.getQueue().bind(0, queue, args.i_src, args.i_key, FieldTable());
- peer.getMessage().subscribe(0, queue, args.i_dest, false, 0, 0, false, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer.getQueue010().declare(queue, "", false, false, true, true, FieldTable());
+ peer.getExchange010().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer.getMessage010().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable());
+ peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
@@ -71,8 +71,8 @@ void Bridge::create()
void Bridge::cancel()
{
- peer.getMessage().cancel(args.i_dest);
- peer.getSession().close();
+ peer.getMessage010().cancel(args.i_dest);
+ peer.getSession010().detach(name);
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 1198285c93..943050e244 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -56,6 +56,7 @@ private:
management::Bridge::shared_ptr mgmtObject;
ConnectionState& connection;
CancellationListener listener;
+ std::string name;
};
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index cfa722e130..fca381063e 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -85,9 +85,9 @@ public:
};
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
- adapter(*this),
+ adapter(*this, isLink),
mgmtClosing(false),
mgmtId(mgmtId_)
{
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index a59df26c84..c8e7fb7079 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -54,7 +54,7 @@ class Connection : public sys::ConnectionInputHandler,
public ConnectionState
{
public:
- Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
index cb831f3023..7e20408388 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -43,11 +43,8 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std:
sys::ConnectionCodec*
ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
- // FIXME aconway 2008-03-18:
-
- // gsim 2008-03-26 this seems only to be used when creating
- // connections from one broker to another
- return new PreviewConnectionCodec(out, broker, id, true);
+ // used to create connections from one broker to another
+ return new amqp_0_10::Connection(out, broker, id, true);
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index c017520334..c7738cc4ea 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -65,23 +65,24 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
}
}
-ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) : handler(new Handler(connection, isClient)) {}
-ConnectionHandler::Handler::Handler(Connection& c) :
+ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
client(c.getOutput()), server(c.getOutput()),
- connection(c), serverMode(false)
+ connection(c), serverMode(!isClient)
{
- FieldTable properties;
- Array mechanisms(0x95);
-
- authenticator = SaslAuthenticator::createAuthenticator(c);
- authenticator->getMechanisms(mechanisms);
-
- Array locales(0x95);
- boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
- locales.add(l);
- serverMode = true;
- client.start(properties, mechanisms, locales);
+ if (serverMode) {
+ FieldTable properties;
+ Array mechanisms(0x95);
+
+ authenticator = SaslAuthenticator::createAuthenticator(c);
+ authenticator->getMechanisms(mechanisms);
+
+ Array locales(0x95);
+ boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
+ locales.add(l);
+ client.start(properties, mechanisms, locales);
+ }
}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
index 37246c7c1e..ea8b84b07c 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
@@ -50,7 +50,7 @@ class ConnectionHandler : public framing::FrameHandler
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
- Handler(Connection& connection);
+ Handler(Connection& connection, bool isClient);
~Handler();
void startOk(const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
@@ -81,7 +81,7 @@ class ConnectionHandler : public framing::FrameHandler
};
std::auto_ptr<Handler> handler;
public:
- ConnectionHandler(Connection& connection);
+ ConnectionHandler(Connection& connection, bool isClient);
void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
void handle(framing::AMQFrame& frame);
};
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 8310980800..d5caf789c0 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -83,7 +83,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
bool SessionHandler::isValid(AMQMethodBody* m) {
- return session.get() || m->isA<SessionAttachBody>();
+ return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>();
}
void SessionHandler::handleOut(AMQFrame& f) {
@@ -134,10 +134,13 @@ void SessionHandler::attach(const std::string& _name, bool /*force*/)
peerSession.commandPoint(session->nextOut, 0);
}
-void SessionHandler::attached(const std::string& /*name*/)
+void SessionHandler::attached(const std::string& _name)
{
+ name = _name;//TODO: this should be used in conjunction with
+ //userid for connection as sessions identity
std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0));
session.reset(state.release());
+ peerSession.commandPoint(session->nextOut, 0);
}
void SessionHandler::detach(const std::string& name)
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index dbd244bcda..6a022c81ba 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -94,7 +94,7 @@ class FederationTests(TestBase010):
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
- def DISABLED_test_pull_from_exchange(self):
+ def test_pull_from_exchange(self):
session = self.session
mgmt = Helper(self)
@@ -122,10 +122,10 @@ class FederationTests(TestBase010):
for i in range(1, 11):
msg = queue.get(timeout=5)
- self.assertEqual("Message %d" % i, msg.content.body)
+ self.assertEqual("Message %d" % i, msg.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message in queue: " + extra.content.body)
+ self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
@@ -135,7 +135,7 @@ class FederationTests(TestBase010):
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
- def DISABLED_test_pull_from_queue(self):
+ def test_pull_from_queue(self):
session = self.session
#setup queue on remote broker and add some messages
@@ -168,10 +168,10 @@ class FederationTests(TestBase010):
for i in range(1, 11):
msg = queue.get(timeout=5)
- self.assertEqual("Message %d" % i, msg.content.body)
+ self.assertEqual("Message %d" % i, msg.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message in queue: " + extra.content.body)
+ self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None