diff options
author | Gordon Sim <gsim@apache.org> | 2008-04-22 20:14:15 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-04-22 20:14:15 +0000 |
commit | dc6dc660ab0de77d6df8f154017aecdcee3b0577 (patch) | |
tree | 78ed77a62285ffe9dba6c148e6328c7ab24b64f4 | |
parent | 74cae9fc609b5964d6e49f405dca07b2b1df592b (diff) | |
download | qpid-python-dc6dc660ab0de77d6df8f154017aecdcee3b0577.tar.gz |
Moved federation to final 0-10 codepath
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@650635 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 26 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionFactory.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 7 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 12 |
11 files changed, 64 insertions, 51 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index c5315ccf4c..03e553f180 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -27,12 +27,21 @@ namespace amqp_0_10 { using sys::Mutex; -Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id) - : frameQueueClosed(false), output(o), connection(this, broker, id), - identifier(id), initialized(false) {} +Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) + : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient), + identifier(id), initialized(false), isClient(_isClient) {} size_t Connection::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); + if (isClient && !initialized) { + //read in protocol header + framing::ProtocolInitiation pi; + if (pi.decode(in)) { + //TODO: check the version is correct + QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")"); + } + initialized = true; + } framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); @@ -44,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) { bool Connection::canEncode() { if (!frameQueueClosed) connection.doOutput(); Mutex::ScopedLock l(frameQueueLock); - return !initialized || !frameQueue.empty(); + return (!isClient && !initialized) || !frameQueue.empty(); } bool Connection::isClosed() const { @@ -55,10 +64,11 @@ bool Connection::isClosed() const { size_t Connection::encode(const char* buffer, size_t size) { Mutex::ScopedLock l(frameQueueLock); framing::Buffer out(const_cast<char*>(buffer), size); - if (!initialized) { + if (!isClient && !initialized) { framing::ProtocolInitiation pi(getVersion()); pi.encode(out); initialized = true; + QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")"); } while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { frameQueue.front().encode(out); diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h index e4672be722..4369d401bd 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h @@ -43,9 +43,10 @@ class Connection : public sys::ConnectionCodec, broker::Connection connection; // FIXME aconway 2008-03-18: std::string identifier; bool initialized; + bool isClient; public: - Connection(sys::OutputControl&, broker::Broker&, const std::string& id); + Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false); size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool isClosed() const; diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 32819380de..598f428ad2 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -34,7 +34,7 @@ namespace broker { Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) : args(_args), channel(id, &(c.getOutput())), peer(channel), mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)), - connection(c), listener(l) + connection(c), listener(l), name(Uuid(true).str()) { management::ManagementAgent::getAgent()->addObject(mgmtObject); } @@ -46,24 +46,24 @@ Bridge::~Bridge() void Bridge::create() { - framing::AMQP_ServerProxy::Session session(channel); - session.open(0); + framing::AMQP_ServerProxy::Session010 session(channel); + session.attach(name, false); if (args.i_src_is_local) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() } else { if (args.i_src_is_queue) { - peer.getMessage().subscribe(0, args.i_src, args.i_dest, false, 0, 0, false, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + peer.getMessage010().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable()); + peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF); } else { string queue = "bridge_queue_"; queue += Uuid(true).str(); - peer.getQueue().declare(0, queue, "", false, false, true, true, FieldTable()); - peer.getQueue().bind(0, queue, args.i_src, args.i_key, FieldTable()); - peer.getMessage().subscribe(0, queue, args.i_dest, false, 0, 0, false, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + peer.getQueue010().declare(queue, "", false, false, true, true, FieldTable()); + peer.getExchange010().bind(queue, args.i_src, args.i_key, FieldTable()); + peer.getMessage010().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable()); + peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF); } } @@ -71,8 +71,8 @@ void Bridge::create() void Bridge::cancel() { - peer.getMessage().cancel(args.i_dest); - peer.getSession().close(); + peer.getMessage010().cancel(args.i_dest); + peer.getSession010().detach(name); } management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 1198285c93..943050e244 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -56,6 +56,7 @@ private: management::Bridge::shared_ptr mgmtObject; ConnectionState& connection; CancellationListener listener; + std::string name; }; diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index cfa722e130..fca381063e 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -85,9 +85,9 @@ public: }; -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) : +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) : ConnectionState(out_, broker_), - adapter(*this), + adapter(*this, isLink), mgmtClosing(false), mgmtId(mgmtId_) { diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index a59df26c84..c8e7fb7079 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -54,7 +54,7 @@ class Connection : public sys::ConnectionInputHandler, public ConnectionState { public: - Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId); + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp index cb831f3023..7e20408388 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -43,11 +43,8 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std: sys::ConnectionCodec* ConnectionFactory::create(sys::OutputControl& out, const std::string& id) { - // FIXME aconway 2008-03-18: - - // gsim 2008-03-26 this seems only to be used when creating - // connections from one broker to another - return new PreviewConnectionCodec(out, broker, id, true); + // used to create connections from one broker to another + return new amqp_0_10::Connection(out, broker, id, true); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index c017520334..c7738cc4ea 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -65,23 +65,24 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) } } -ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {} +ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) : handler(new Handler(connection, isClient)) {} -ConnectionHandler::Handler::Handler(Connection& c) : +ConnectionHandler::Handler::Handler(Connection& c, bool isClient) : client(c.getOutput()), server(c.getOutput()), - connection(c), serverMode(false) + connection(c), serverMode(!isClient) { - FieldTable properties; - Array mechanisms(0x95); - - authenticator = SaslAuthenticator::createAuthenticator(c); - authenticator->getMechanisms(mechanisms); - - Array locales(0x95); - boost::shared_ptr<FieldValue> l(new Str16Value(en_US)); - locales.add(l); - serverMode = true; - client.start(properties, mechanisms, locales); + if (serverMode) { + FieldTable properties; + Array mechanisms(0x95); + + authenticator = SaslAuthenticator::createAuthenticator(c); + authenticator->getMechanisms(mechanisms); + + Array locales(0x95); + boost::shared_ptr<FieldValue> l(new Str16Value(en_US)); + locales.add(l); + client.start(properties, mechanisms, locales); + } } diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index 37246c7c1e..ea8b84b07c 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -50,7 +50,7 @@ class ConnectionHandler : public framing::FrameHandler bool serverMode; std::auto_ptr<SaslAuthenticator> authenticator; - Handler(Connection& connection); + Handler(Connection& connection, bool isClient); ~Handler(); void startOk(const qpid::framing::FieldTable& clientProperties, const std::string& mechanism, const std::string& response, @@ -81,7 +81,7 @@ class ConnectionHandler : public framing::FrameHandler }; std::auto_ptr<Handler> handler; public: - ConnectionHandler(Connection& connection); + ConnectionHandler(Connection& connection, bool isClient); void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); void handle(framing::AMQFrame& frame); }; diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 8310980800..d5caf789c0 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -83,7 +83,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } bool SessionHandler::isValid(AMQMethodBody* m) { - return session.get() || m->isA<SessionAttachBody>(); + return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>(); } void SessionHandler::handleOut(AMQFrame& f) { @@ -134,10 +134,13 @@ void SessionHandler::attach(const std::string& _name, bool /*force*/) peerSession.commandPoint(session->nextOut, 0); } -void SessionHandler::attached(const std::string& /*name*/) +void SessionHandler::attached(const std::string& _name) { + name = _name;//TODO: this should be used in conjunction with + //userid for connection as sessions identity std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0)); session.reset(state.release()); + peerSession.commandPoint(session->nextOut, 0); } void SessionHandler::detach(const std::string& name) diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index dbd244bcda..6a022c81ba 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -94,7 +94,7 @@ class FederationTests(TestBase010): mgmt.call_method(link, "close") self.assertEqual(len(mgmt.get_objects("link")), 0) - def DISABLED_test_pull_from_exchange(self): + def test_pull_from_exchange(self): session = self.session mgmt = Helper(self) @@ -122,10 +122,10 @@ class FederationTests(TestBase010): for i in range(1, 11): msg = queue.get(timeout=5) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected message in queue: " + extra.content.body) + self.fail("Got unexpected message in queue: " + extra.body) except Empty: None @@ -135,7 +135,7 @@ class FederationTests(TestBase010): mgmt.call_method(link, "close") self.assertEqual(len(mgmt.get_objects("link")), 0) - def DISABLED_test_pull_from_queue(self): + def test_pull_from_queue(self): session = self.session #setup queue on remote broker and add some messages @@ -168,10 +168,10 @@ class FederationTests(TestBase010): for i in range(1, 11): msg = queue.get(timeout=5) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected message in queue: " + extra.content.body) + self.fail("Got unexpected message in queue: " + extra.body) except Empty: None |