diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionManager.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionManager.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 9 | ||||
-rwxr-xr-x | qpid/python/commands/qpid-tool | 9 | ||||
-rw-r--r-- | qpid/python/qpid/management.py | 18 | ||||
-rw-r--r-- | qpid/python/qpid/managementdata.py | 13 |
9 files changed, 54 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index f5fa22060f..e59a79f711 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -117,7 +117,7 @@ void SessionHandler::attach(const std::string& _name, bool /*force*/) //TODO: need to revise session manager to support resume as well assertClosed("attach"); - session.reset(new SessionState(0, this, 0, 0)); + session.reset(new SessionState(0, this, 0, 0, name)); peerSession.attached(name); peerSession.commandPoint(session->nextOut, 0); } @@ -126,7 +126,7 @@ void SessionHandler::attached(const std::string& _name) { name = _name;//TODO: this should be used in conjunction with //userid for connection as sessions identity - session.reset(new SessionState(0, this, 0, 0)); + session.reset(new SessionState(0, this, 0, 0, name)); peerSession.commandPoint(session->nextOut, 0); } diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp index 6e235e32c3..d7bae737fc 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.cpp +++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp @@ -46,11 +46,11 @@ SessionManager::~SessionManager() {} // FIXME aconway 2008-02-01: pass handler*, allow open unattached. std::auto_ptr<SessionState> SessionManager::open( - SessionHandler& h, uint32_t timeout_) + SessionHandler& h, uint32_t timeout_, std::string _name) { Mutex::ScopedLock l(lock); std::auto_ptr<SessionState> session( - new SessionState(this, &h, timeout_, ack)); + new SessionState(this, &h, timeout_, ack, _name)); active.insert(session->getId()); for_each(observers.begin(), observers.end(), boost::bind(&Observer::opened, _1,boost::ref(*session))); diff --git a/qpid/cpp/src/qpid/broker/SessionManager.h b/qpid/cpp/src/qpid/broker/SessionManager.h index cc2190c2d1..ad064c69bb 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.h +++ b/qpid/cpp/src/qpid/broker/SessionManager.h @@ -58,7 +58,7 @@ class SessionManager : private boost::noncopyable { ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_); + std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_, std::string name); /** Suspend a session, start it's timeout counter. * The factory takes ownership. diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 50938de8ac..2ef1ed2de4 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -45,12 +45,12 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, string& _name) : framing::SessionState(ack, timeout_ > 0), nextOut(0), factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), version(h->getConnection().getVersion()), - ignoring(false), + ignoring(false), name(_name), semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), @@ -68,7 +68,7 @@ SessionState::SessionState( if (agent.get () != 0) { mgmtObject = management::Session::shared_ptr - (new management::Session (this, parent, id.str ())); + (new management::Session (this, parent, name)); mgmtObject->set_attached (1); mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h->getChannel()); diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 2ec68260a1..ae860e84c9 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -111,7 +111,8 @@ class SessionState : public framing::SessionState, SessionState(SessionManager*, SessionHandler* out, uint32_t timeout, - uint32_t ackInterval); + uint32_t ackInterval, + std::string& name); framing::SequenceSet completed; @@ -131,6 +132,7 @@ class SessionState : public framing::SessionState, framing::ProtocolVersion version; sys::Mutex lock; bool ignoring; + std::string name; SemanticState semanticState; SessionAdapter adapter; diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 25654fe1c7..33da19b1b8 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -142,6 +142,9 @@ class FederationTests(TestBase010): mgmt.call_method(bridge, "close") mgmt.call_method(link, "close") + sleep(6) + self.assertEqual(len(mgmt.get_objects("bridge")), 0) + self.assertEqual(len(mgmt.get_objects("link")), 0) mgmt.shutdown() @@ -191,6 +194,9 @@ class FederationTests(TestBase010): mgmt.call_method(bridge, "close") mgmt.call_method(link, "close") + sleep(6) + self.assertEqual(len(mgmt.get_objects("bridge")), 0) + self.assertEqual(len(mgmt.get_objects("link")), 0) mgmt.shutdown () @@ -239,6 +245,9 @@ class FederationTests(TestBase010): mgmt.call_method(bridge, "close") mgmt.call_method(link, "close") + sleep(6) + self.assertEqual(len(mgmt.get_objects("bridge")), 0) + self.assertEqual(len(mgmt.get_objects("link")), 0) mgmt.shutdown () diff --git a/qpid/python/commands/qpid-tool b/qpid/python/commands/qpid-tool index 1aee3a1b7f..9977db3518 100755 --- a/qpid/python/commands/qpid-tool +++ b/qpid/python/commands/qpid-tool @@ -31,16 +31,23 @@ from qpid.peer import Closed class Mcli (Cmd): """ Management Command Interpreter """ - prompt = "qpid: " def __init__ (self, dataObject, dispObject): Cmd.__init__ (self) self.dataObject = dataObject self.dispObject = dispObject + self.dataObject.setCli (self) + self.prompt = "qpid: " def emptyline (self): pass + def setPromptMessage (self, p): + if p == None: + self.prompt = "qpid: " + else: + self.prompt = "qpid[%s]: " % p + def do_help (self, data): print "Management Tool for QPID" print diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index d8f09d14ab..0e7233fad2 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -81,7 +81,7 @@ class methodResult: class managementChannel: """ This class represents a connection to an AMQP broker. """ - def __init__ (self, ssn, topicCb, replyCb, cbContext, _detlife=0): + def __init__ (self, ssn, topicCb, replyCb, exceptionCb, cbContext, _detlife=0): """ Given a channel on an established AMQP broker connection, this method opens a session and performs all of the declarations and bindings needed to participate in the management protocol. """ @@ -93,6 +93,7 @@ class managementChannel: self.qpidChannel = ssn self.tcb = topicCb self.rcb = replyCb + self.ecb = exceptionCb self.context = cbContext self.reqsOutstanding = 0 @@ -104,7 +105,7 @@ class managementChannel: ssn.message_subscribe (queue=self.topicName, destination="tdest") ssn.message_subscribe (queue=self.replyName, destination="rdest") - ssn.incoming ("tdest").listen (self.topicCb) + ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb) ssn.incoming ("rdest").listen (self.replyCb) ssn.message_set_flow_mode (destination="tdest", flow_mode=1) @@ -130,6 +131,10 @@ class managementChannel: if self.enabled: self.rcb (self, msg) + def exceptionCb (self, data): + if self.ecb != None: + self.ecb (data) + def send (self, exchange, msg): if self.enabled: self.qpidChannel.message_transfer (destination=exchange, message=msg) @@ -160,12 +165,13 @@ class managementClient: #======================================================== # User API - interacts with the class's user #======================================================== - def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, methodCb=None): + def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, methodCb=None, closeCb=None): self.spec = amqpSpec self.ctrlCb = ctrlCb self.configCb = configCb self.instCb = instCb self.methodCb = methodCb + self.closeCb = closeCb self.schemaCb = None self.eventCb = None self.channels = [] @@ -189,7 +195,7 @@ class managementClient: def addChannel (self, channel, cbContext=None): """ Register a new channel. """ - mch = managementChannel (channel, self.topicCb, self.replyCb, cbContext) + mch = managementChannel (channel, self.topicCb, self.replyCb, self.exceptCb, cbContext) self.channels.append (mch) self.incOutstanding (mch) @@ -312,6 +318,10 @@ class managementClient: self.parse (ch, codec, hdr[0], hdr[1]) ch.accept(msg) + def exceptCb (self, data): + if self.closeCb != None: + self.closeCb (data) + #======================================================== # Internal Functions #======================================================== diff --git a/qpid/python/qpid/managementdata.py b/qpid/python/qpid/managementdata.py index 1d99cc11bc..bdc299767d 100644 --- a/qpid/python/qpid/managementdata.py +++ b/qpid/python/qpid/managementdata.py @@ -160,11 +160,20 @@ class ManagementData: finally: self.lock.release () + def closeHandler (self, reason): + print "Connection to broker lost:", reason + self.operational = False + if self.cli != None: + self.cli.setPromptMessage ("Broker Disconnected") + def schemaHandler (self, context, className, configs, insts, methods, events): """ Callback for schema updates """ if className not in self.schema: self.schema[className] = (configs, insts, methods, events) + def setCli (self, cliobj): + self.cli = cliobj + def __init__ (self, disp, host, username="guest", password="guest", specfile="../../specs/amqp.0-10.xml"): self.spec = qpid.spec.load (specfile) @@ -184,9 +193,11 @@ class ManagementData: self.conn.start () self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler, - self.instHandler, self.methodReply) + self.instHandler, self.methodReply, self.closeHandler) self.mclient.schemaListener (self.schemaHandler) self.mch = self.mclient.addChannel (self.conn.session(self.sessionId)) + self.operational = True + self.cli = None def close (self): pass |