summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-01-02 15:56:20 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-01-02 15:56:20 +0000
commite336559c82f22ecd0a013b8ea787bb4946ab2fdc (patch)
tree7ffb2bc4ff702ac50e872529b6a07e041a88df0a
parent4e1fabd16161480d352d3813a6e41c5a97ab8c57 (diff)
downloadqpid-python-e336559c82f22ecd0a013b8ea787bb4946ab2fdc.tar.gz
patch-715 (tross)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@608135 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/managementgen/main.py7
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp66
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h16
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionManager.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp75
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h11
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp85
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h8
-rw-r--r--qpid/cpp/src/qpid/management/ManagementExchange.h2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h16
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h3
-rwxr-xr-xqpid/python/mgmt-cli/main.py1
-rw-r--r--qpid/python/mgmt-cli/managementdata.py141
-rw-r--r--qpid/python/qpid/management.py65
-rw-r--r--qpid/specs/management-schema.xml36
-rw-r--r--qpid/specs/management-types.xml18
22 files changed, 446 insertions, 128 deletions
diff --git a/qpid/cpp/managementgen/main.py b/qpid/cpp/managementgen/main.py
index 2f70639482..ddf18ef873 100755
--- a/qpid/cpp/managementgen/main.py
+++ b/qpid/cpp/managementgen/main.py
@@ -41,8 +41,7 @@ parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE",
if opts.outdir == None or \
opts.typefile == None or \
opts.schemafile == None or \
- opts.templatedir == None or \
- opts.makefile == None:
+ opts.templatedir == None:
parser.error ("Incorrect options, see --help for help")
gen = Generator (opts.outdir, opts.templatedir)
@@ -51,4 +50,6 @@ schema = PackageSchema (opts.typefile, opts.schemafile)
gen.makeClassFiles ("Class.h", schema)
gen.makeClassFiles ("Class.cpp", schema)
gen.makeMethodFiles ("Args.h", schema)
-gen.makeMakeFile (opts.makefile)
+
+if opts.makefile != None:
+ gen.makeMakeFile (opts.makefile)
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index d83eabee06..5607d22498 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -231,6 +231,7 @@ void Broker::run() {
void Broker::shutdown() {
if (acceptor)
acceptor->shutdown();
+ ManagementAgent::shutdown ();
}
Broker::~Broker() {
@@ -270,6 +271,11 @@ ManagementObject::shared_ptr Broker::GetManagementObject(void) const
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
}
+Manageable* Broker::GetVhostObject(void) const
+{
+ return vhostObject.get();
+}
+
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
Args& /*_args*/)
{
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 1c1c303be8..aaa0be72bb 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -119,6 +119,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
SessionManager& getSessionManager() { return sessionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable* GetVhostObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index f2cd1c11e4..fbe018e8ae 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -26,6 +26,7 @@
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/management/ManagementAgent.h"
#include <boost/bind.hpp>
@@ -38,11 +39,15 @@ using namespace qpid::sys;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid::ptr_map;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
namespace qpid {
namespace broker {
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
+ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const Socket& s) :
broker(broker_),
outputTasks(*out_),
out(out_),
@@ -50,15 +55,45 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
heartbeat(0),
client(0),
stagingThreshold(broker.getStagingThreshold()),
- adapter(*this)
-{}
+ adapter(*this),
+ mgmtClosing(0)
+{
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Client::shared_ptr
+ (new management::Client (this, parent, s.getPeerAddress ()));
+ agent->addObject (mgmtObject);
+ }
+ }
+}
+
+Connection::~Connection ()
+{
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
void Connection::received(framing::AMQFrame& frame){
+ if (mgmtClosing)
+ close (403, "Closed by Management Request", 0, 0);
+
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
getChannel(frame.getChannel()).in(frame);
}
+
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->inc_framesFromClient ();
+ mgmtObject->inc_bytesFromClient (frame.size ());
+ }
}
void Connection::close(
@@ -122,5 +157,30 @@ SessionHandler& Connection::getChannel(ChannelId id) {
return *get_pointer(i);
}
+ManagementObject::shared_ptr Connection::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
+ Args& /*args*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Client::METHOD_CLOSE :
+ mgmtClosing = 1;
+ mgmtObject->set_closing (1);
+ status = Manageable::STATUS_OK;
+ break;
+ }
+
+ return status;
+}
+
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 395aa7b0bd..cbe7addec2 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -35,9 +35,12 @@
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/framing/ProtocolVersion.h"
#include "Broker.h"
+#include "qpid/sys/Socket.h"
#include "qpid/Exception.h"
#include "ConnectionHandler.h"
#include "SessionHandler.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Client.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -45,10 +48,12 @@ namespace qpid {
namespace broker {
class Connection : public sys::ConnectionInputHandler,
- public ConnectionToken
+ public ConnectionToken,
+ public management::Manageable
{
public:
- Connection(sys::ConnectionOutputHandler* out, Broker& broker);
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker, const sys::Socket& s);
+ ~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
SessionHandler& getChannel(framing::ChannelId channel);
@@ -85,6 +90,11 @@ class Connection : public sys::ConnectionInputHandler,
void closeChannel(framing::ChannelId channel);
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -97,6 +107,8 @@ class Connection : public sys::ConnectionInputHandler,
framing::AMQP_ClientProxy::Connection* client;
uint64_t stagingThreshold;
ConnectionHandler adapter;
+ management::Client::shared_ptr mgmtObject;
+ bool mgmtClosing;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
index 4429a2583c..c925c4be2f 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -35,9 +35,10 @@ ConnectionFactory::~ConnectionFactory()
}
qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out)
+ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
+ const qpid::sys::Socket& s)
{
- return new Connection(out, broker);
+ return new Connection(out, broker, s);
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.h b/qpid/cpp/src/qpid/broker/ConnectionFactory.h
index 6af3dda7a6..23fba5c1ab 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.h
@@ -32,8 +32,8 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
public:
ConnectionFactory(Broker& b);
- virtual qpid::sys::ConnectionInputHandler* create(
- qpid::sys::ConnectionOutputHandler* ctxt);
+ virtual qpid::sys::ConnectionInputHandler* create
+ (qpid::sys::ConnectionOutputHandler* ctxt, const sys::Socket& s);
virtual ~ConnectionFactory();
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index b68a7db8b0..f753e7ef75 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -616,10 +616,6 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId,
purge ();
status = Manageable::STATUS_OK;
break;
-
- case management::Queue::METHOD_INCREASEJOURNALSIZE :
- status = Manageable::STATUS_NOT_IMPLEMENTED;
- break;
}
return status;
diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp
index f12ebc6db1..1d5f9ebada 100644
--- a/qpid/cpp/src/qpid/broker/SessionManager.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp
@@ -58,6 +58,8 @@ void SessionManager::suspend(std::auto_ptr<SessionState> session) {
active.erase(session->getId());
session->suspend();
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+ if (session->mgmtObject.get() != 0)
+ session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
suspended.push_back(session.release()); // In expiry order
eraseExpired();
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index bea1eaedcf..a75b32cbb5 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -31,6 +31,10 @@ namespace broker {
using namespace framing;
using sys::Mutex;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
@@ -50,11 +54,34 @@ SessionState::SessionState(
// TODO aconway 2007-09-20: SessionManager may add plugin
// handlers to the chain.
getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Session::shared_ptr
+ (new management::Session (this, parent, id.str ()));
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
+ mgmtObject->set_detachedLifespan (getTimeout());
+ agent->addObject (mgmtObject);
+ }
+ }
}
SessionState::~SessionState() {
// Remove ID from active session list.
factory.erase(getId());
+
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->resourceDestroy ();
+ }
}
SessionHandler* SessionState::getHandler() {
@@ -75,12 +102,22 @@ void SessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
Mutex::ScopedLock l(lock);
handler = 0;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (0);
+ }
}
void SessionState::attach(SessionHandler& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
+ }
}
h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
}
@@ -96,4 +133,42 @@ void SessionState::activateOutput()
//if not attached, it can simply ignore the callback, else pass it
//on to the connection
+ManagementObject::shared_ptr SessionState::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
+ Args& /*args*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ switch (methodId)
+ {
+ case management::Session::METHOD_DETACH :
+ if (handler != 0)
+ {
+ handler->localSuspend ();
+ }
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Session::METHOD_CLOSE :
+ if (handler != 0)
+ {
+ handler->getConnection().closeChannel(handler->getChannel());
+ }
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Session::METHOD_SOLICITACK :
+ case management::Session::METHOD_RESETLIFESPAN :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
+}
+
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index ac2a33442a..c8c32a046d 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -29,6 +29,8 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/sys/Time.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Session.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -57,7 +59,8 @@ class Connection;
*/
class SessionState : public framing::SessionState,
public framing::FrameHandler::InOutHandler,
- public sys::OutputControl
+ public sys::OutputControl,
+ public management::Manageable
{
public:
~SessionState();
@@ -82,6 +85,11 @@ class SessionState : public framing::SessionState,
/** OutputControl **/
void activateOutput();
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
@@ -102,6 +110,7 @@ class SessionState : public framing::SessionState,
framing::ProtocolVersion version;
sys::Mutex lock;
boost::scoped_ptr<SemanticHandler> semanticHandler;
+ management::Session::shared_ptr mgmtObject;
friend class SessionManager;
};
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 90da74404b..d3c5d7c266 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -41,6 +41,8 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
}
+ManagementAgent::~ManagementAgent () {}
+
void ManagementAgent::enableManagement (void)
{
enabled = 1;
@@ -54,6 +56,16 @@ ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
return agent;
}
+void ManagementAgent::shutdown (void)
+{
+ if (agent.get () != 0)
+ {
+ agent->mExchange.reset ();
+ agent->dExchange.reset ();
+ agent.reset ();
+ }
+}
+
void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
broker::Exchange::shared_ptr _dexchange)
{
@@ -73,6 +85,8 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object)
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
: TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
+ManagementAgent::Periodic::~Periodic () {}
+
void ManagementAgent::Periodic::fire ()
{
agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
@@ -93,12 +107,27 @@ void ManagementAgent::clientAdded (void)
}
}
-void ManagementAgent::EncodeHeader (Buffer& buf)
+void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls)
{
buf.putOctet ('A');
buf.putOctet ('M');
buf.putOctet ('0');
buf.putOctet ('1');
+ buf.putOctet (opcode);
+ buf.putOctet (cls);
+}
+
+bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls)
+{
+ uint8_t h1 = buf.getOctet ();
+ uint8_t h2 = buf.getOctet ();
+ uint8_t h3 = buf.getOctet ();
+ uint8_t h4 = buf.getOctet ();
+
+ *opcode = buf.getOctet ();
+ *cls = buf.getOctet ();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1';
}
void ManagementAgent::SendBuffer (Buffer& buf,
@@ -112,9 +141,6 @@ void ManagementAgent::SendBuffer (Buffer& buf,
AMQFrame header (in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>());
- QPID_LOG (debug, "ManagementAgent::SendBuffer - key="
- << routingKey << " len=" << length);
-
content.castBody<AMQContentBody>()->decode(buf, length);
method.setEof (false);
@@ -156,9 +182,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getSchemaNeeded ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('S'); // opcode = Schema Record
- msgBuffer.putOctet (0); // content-class = N/A
+ EncodeHeader (msgBuffer, 'S');
object->writeSchema (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -170,9 +194,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('C'); // content-class = Configuration
+ EncodeHeader (msgBuffer, 'C', 'C');
object->writeConfig (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -184,9 +206,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getInstChanged ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('I'); // content-class = Instrumentation
+ EncodeHeader (msgBuffer, 'C', 'I');
object->writeInstrumentation (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -251,9 +271,6 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
start = pos + 1;
string methodName = routingKey.substr (start, routingKey.length () - start);
- QPID_LOG (debug, "Dispatch package: " << packageName << ", class: "
- << className << ", method: " << methodName);
-
contentSize = msg.encodedContentSize ();
if (contentSize < 8 || contentSize > 65536)
return;
@@ -263,19 +280,41 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
Buffer inBuffer (inMem, contentSize);
Buffer outBuffer (outMem, 4096);
uint32_t outLen;
+ uint8_t opcode, unused;
msg.encodeContent (inBuffer);
inBuffer.reset ();
- uint32_t methodId = inBuffer.getLong ();
- uint64_t objId = inBuffer.getLongLong ();
- string replyTo;
+ if (!CheckHeader (inBuffer, &opcode, &unused))
+ {
+ QPID_LOG (debug, " Invalid content header");
+ return;
+ }
+
+ if (opcode != 'M')
+ {
+ QPID_LOG (debug, " Unexpected opcode " << opcode);
+ return;
+ }
- inBuffer.getShortString (replyTo);
+ uint32_t methodId = inBuffer.getLong ();
+ uint64_t objId = inBuffer.getLongLong ();
+ string replyToKey;
- QPID_LOG (debug, " len = " << contentSize << ", methodId = " <<
- methodId << ", objId = " << objId);
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo())
+ {
+ const framing::ReplyTo& rt = p->getReplyTo ();
+ replyToKey = rt.getRoutingKey ();
+ }
+ else
+ {
+ QPID_LOG (debug, " Reply-to missing");
+ return;
+ }
+ EncodeHeader (outBuffer, 'R');
outBuffer.putLong (methodId);
ManagementObjectMap::iterator iter = managementObjects.find (objId);
@@ -291,7 +330,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
outLen = 4096 - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyTo);
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
free (inMem);
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index a4f10632da..36ba1f0542 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -40,10 +40,13 @@ class ManagementAgent
public:
+ virtual ~ManagementAgent ();
+
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
static void enableManagement (void);
static shared_ptr getAgent (void);
+ static void shutdown (void);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (broker::Exchange::shared_ptr mgmtExchange,
@@ -61,7 +64,7 @@ class ManagementAgent
ManagementAgent& agent;
Periodic (ManagementAgent& agent, uint32_t seconds);
- ~Periodic () {}
+ virtual ~Periodic ();
void fire ();
};
@@ -77,7 +80,8 @@ class ManagementAgent
uint64_t nextObjectId;
void PeriodicProcessing (void);
- void EncodeHeader (qpid::framing::Buffer& buf);
+ void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0);
+ bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls);
void SendBuffer (qpid::framing::Buffer& buf,
uint32_t length,
broker::Exchange::shared_ptr exchange,
diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.h b/qpid/cpp/src/qpid/management/ManagementExchange.h
index 0fcd65b092..1a79482c9d 100644
--- a/qpid/cpp/src/qpid/management/ManagementExchange.h
+++ b/qpid/cpp/src/qpid/management/ManagementExchange.h
@@ -40,7 +40,7 @@ class ManagementExchange : public virtual TopicExchange
const qpid::framing::FieldTable& _args,
Manageable* _parent = 0);
- virtual std::string getType() const { return typeName; }
+ virtual std::string getType() const { return typeName; }
virtual bool bind (Queue::shared_ptr queue,
const string& routingKey,
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index ff136c397d..a32055721d 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -46,12 +46,16 @@ class ManagementObject
Manageable* coreObject;
std::string className;
- static const uint8_t TYPE_U8 = 1;
- static const uint8_t TYPE_U16 = 2;
- static const uint8_t TYPE_U32 = 3;
- static const uint8_t TYPE_U64 = 4;
- static const uint8_t TYPE_SSTR = 6;
- static const uint8_t TYPE_LSTR = 7;
+ static const uint8_t TYPE_U8 = 1;
+ static const uint8_t TYPE_U16 = 2;
+ static const uint8_t TYPE_U32 = 3;
+ static const uint8_t TYPE_U64 = 4;
+ static const uint8_t TYPE_SSTR = 6;
+ static const uint8_t TYPE_LSTR = 7;
+ static const uint8_t TYPE_ABSTIME = 8;
+ static const uint8_t TYPE_DELTATIME = 9;
+ static const uint8_t TYPE_REF = 10;
+ static const uint8_t TYPE_BOOL = 11;
static const uint8_t ACCESS_RC = 1;
static const uint8_t ACCESS_RW = 2;
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 51ec7f718a..9ca083354b 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -131,7 +131,7 @@ public:
void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
AsynchIOHandler* async = new AsynchIOHandler;
- ConnectionInputHandler* handler = f->create(async);
+ ConnectionInputHandler* handler = f->create(async, s);
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
index af7d411928..4a90f8b736 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
@@ -22,6 +22,7 @@
#define _ConnectionInputHandlerFactory_
#include <boost/noncopyable.hpp>
+#include "qpid/sys/Socket.h"
namespace qpid {
namespace sys {
@@ -36,7 +37,7 @@ class ConnectionInputHandler;
class ConnectionInputHandlerFactory : private boost::noncopyable
{
public:
- virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0;
+ virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt, const Socket& s) = 0;
virtual ~ConnectionInputHandlerFactory(){}
};
diff --git a/qpid/python/mgmt-cli/main.py b/qpid/python/mgmt-cli/main.py
index 4d7c03d1f2..76e1f25c14 100755
--- a/qpid/python/mgmt-cli/main.py
+++ b/qpid/python/mgmt-cli/main.py
@@ -49,6 +49,7 @@ class Mcli (Cmd):
print " list <className> - Print list of objects of the specified class"
print " list <className> all - Print contents of all objects of specified class"
print " list <className> active - Print contents of all non-deleted objects of specified class"
+ print " list <list-of-IDs> - Print contents of one or more objects (infer className)"
print " list <className> <list-of-IDs> - Print contents of one or more objects"
print " list is space-separated, ranges may be specified (i.e. 1004-1010)"
print " call <ID> <methodName> [<args>] - Invoke a method on an object"
diff --git a/qpid/python/mgmt-cli/managementdata.py b/qpid/python/mgmt-cli/managementdata.py
index 2adb962b54..419cbd13c0 100644
--- a/qpid/python/mgmt-cli/managementdata.py
+++ b/qpid/python/mgmt-cli/managementdata.py
@@ -118,6 +118,7 @@ class ManagementData:
self.schema = {}
self.baseId = 0
self.disp = disp
+ self.lastUnit = None
self.methodSeq = 1
self.methodsPending = {}
self.broker.start ()
@@ -125,6 +126,44 @@ class ManagementData:
def close (self):
self.broker.stop ()
+ def refName (self, oid):
+ if oid == 0:
+ return "NULL"
+ return str (oid - self.baseId)
+
+ def valueDisplay (self, className, key, value):
+ for kind in range (2):
+ schema = self.schema[className][kind]
+ for item in schema:
+ if item[0] == key:
+ typecode = item[1]
+ unit = item[2]
+ if typecode >= 1 and typecode <= 5: # numerics
+ if unit == None or unit == self.lastUnit:
+ return str (value)
+ else:
+ self.lastUnit = unit
+ suffix = ""
+ if value != 1:
+ suffix = "s"
+ return str (value) + " " + unit + suffix
+ elif typecode == 6 or typecode == 7: # strings
+ return value
+ elif typecode == 8:
+ if value == 0:
+ return "--"
+ return self.disp.timestamp (value)
+ elif typecode == 9:
+ return str (value)
+ elif typecode == 10:
+ return self.refName (value)
+ elif typecode == 11:
+ if value == 0:
+ return "False"
+ else:
+ return "True"
+ return "*type-error*"
+
def getObjIndex (self, className, config):
""" Concatenate the values from index columns to form a unique object name """
result = ""
@@ -135,9 +174,7 @@ class ManagementData:
result = result + "."
for key,val in config:
if key == item[0]:
- if key.find ("Ref") != -1:
- val = val - self.baseId
- result = result + str (val)
+ result = result + self.valueDisplay (className, key, val)
return result
def classCompletions (self, prefix):
@@ -168,6 +205,14 @@ class ManagementData:
return "short-string"
elif typecode == 7:
return "long-string"
+ elif typecode == 8:
+ return "abs-time"
+ elif typecode == 9:
+ return "delta-time"
+ elif typecode == 10:
+ return "reference"
+ elif typecode == 11:
+ return "boolean"
else:
raise ValueError ("Invalid type code: %d" % typecode)
@@ -180,7 +225,7 @@ class ManagementData:
elif code == 3:
return "ReadOnly"
else:
- raise ValueErrir ("Invalid access code: %d" %code)
+ raise ValueError ("Invalid access code: %d" %code)
def notNone (self, text):
if text == None:
@@ -188,6 +233,12 @@ class ManagementData:
else:
return text
+ def isOid (self, id):
+ for char in str (id):
+ if not char.isdigit () and not char == '-':
+ return False
+ return True
+
def listOfIds (self, className, tokens):
""" Generate a tuple of object ids for a classname based on command tokens. """
list = []
@@ -202,13 +253,14 @@ class ManagementData:
else:
for token in tokens:
- if token.find ("-") != -1:
- ids = token.split("-", 2)
- for id in range (int (ids[0]), int (ids[1]) + 1):
- if self.getClassForId (long (id) + self.baseId) == className:
- list.append (id)
- else:
- list.append (token)
+ if self.isOid (token):
+ if token.find ("-") != -1:
+ ids = token.split("-", 2)
+ for id in range (int (ids[0]), int (ids[1]) + 1):
+ if self.getClassForId (long (id) + self.baseId) == className:
+ list.append (id)
+ else:
+ list.append (token)
list.sort ()
result = ()
@@ -258,7 +310,7 @@ class ManagementData:
if ts[2] > 0:
destroyTime = self.disp.timestamp (ts[2])
objIndex = self.getObjIndex (className, config)
- row = (objId - self.baseId, createTime, destroyTime, objIndex)
+ row = (self.refName (objId), createTime, destroyTime, objIndex)
rows.append (row)
self.disp.table ("Objects of type %s" % className,
("ID", "Created", "Destroyed", "Index"),
@@ -270,12 +322,26 @@ class ManagementData:
""" Generate a display of object data for a particular class """
self.lock.acquire ()
try:
- className = tokens[0]
- if className not in self.tables:
- print "Class not known: %s" % className
- raise ValueError ()
-
- userIds = self.listOfIds (className, tokens[1:])
+ self.lastUnit = None
+ if self.isOid (tokens[0]):
+ if tokens[0].find ("-") != -1:
+ rootId = int (tokens[0][0:tokens[0].find ("-")])
+ else:
+ rootId = int (tokens[0])
+
+ className = self.getClassForId (rootId + self.baseId)
+ remaining = tokens
+ if className == None:
+ print "Id not known: %d" % int (tokens[0])
+ raise ValueError ()
+ else:
+ className = tokens[0]
+ remaining = tokens[1:]
+ if className not in self.tables:
+ print "Class not known: %s" % className
+ raise ValueError ()
+
+ userIds = self.listOfIds (className, remaining)
if len (userIds) == 0:
print "No object IDs supplied"
raise ValueError ()
@@ -286,36 +352,37 @@ class ManagementData:
ids.append (long (id) + self.baseId)
rows = []
+ timestamp = None
config = self.tables[className][ids[0]][1]
for eIdx in range (len (config)):
key = config[eIdx][0]
if key != "id":
- isRef = key.find ("Ref") == len (key) - 3
row = ("config", key)
for id in ids:
- value = self.tables[className][id][1][eIdx][1]
- if isRef:
- value = value - self.baseId
- row = row + (value,)
+ if timestamp == None or \
+ timestamp < self.tables[className][id][0][0]:
+ timestamp = self.tables[className][id][0][0]
+ (key, value) = self.tables[className][id][1][eIdx]
+ row = row + (self.valueDisplay (className, key, value),)
rows.append (row)
inst = self.tables[className][ids[0]][2]
for eIdx in range (len (inst)):
key = inst[eIdx][0]
if key != "id":
- isRef = key.find ("Ref") == len (key) - 3
row = ("inst", key)
for id in ids:
- value = self.tables[className][id][2][eIdx][1]
- if isRef:
- value = value - self.baseId
- row = row + (value,)
+ (key, value) = self.tables[className][id][2][eIdx]
+ row = row + (self.valueDisplay (className, key, value),)
rows.append (row)
titleRow = ("Type", "Element")
for id in ids:
- titleRow = titleRow + (str (id - self.baseId),)
- self.disp.table ("Object of type %s:" % className, titleRow, rows)
+ titleRow = titleRow + (self.refName (id),)
+ caption = "Object of type %s:" % className
+ if timestamp != None:
+ caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")"
+ self.disp.table (caption, titleRow, rows)
except:
pass
@@ -418,6 +485,10 @@ class ManagementData:
if className == None:
raise ValueError ()
+ if methodName not in self.schema[className][2]:
+ print "Method '%s' not valid for class '%s'" % (methodName, className)
+ raise ValueError ()
+
schemaMethod = self.schema[className][2][methodName]
if len (args) != len (schemaMethod[1]):
print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args))
@@ -431,17 +502,19 @@ class ManagementData:
self.methodsPending[self.methodSeq] = methodName
except:
methodOk = False
- print "Error in call syntax"
self.lock.release ()
if methodOk:
- self.broker.method (self.methodSeq, userOid + self.baseId, className,
- methodName, namedArgs)
+# try:
+ self.broker.method (self.methodSeq, userOid + self.baseId, className,
+ methodName, namedArgs)
+# except ValueError, e:
+# print "Error invoking method:", e
def do_list (self, data):
tokens = data.split ()
if len (tokens) == 0:
self.listClasses ()
- elif len (tokens) == 1:
+ elif len (tokens) == 1 and not self.isOid (tokens[0]):
self.listObjects (data)
else:
self.showObjects (tokens)
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py
index 1c8b3cd840..40de2a5298 100644
--- a/qpid/python/qpid/management.py
+++ b/qpid/python/qpid/management.py
@@ -77,6 +77,14 @@ class ManagementMetadata:
codec.encode_shortstr (value)
elif typecode == 7:
codec.encode_longstr (value)
+ elif typecode == 8: # ABSTIME
+ codec.encode_longlong (long (value))
+ elif typecode == 9: # DELTATIME
+ codec.encode_longlong (long (value))
+ elif typecode == 10: # REF
+ codec.encode_longlong (long (value))
+ elif typecode == 11: # BOOL
+ codec.encode_octet (int (value))
else:
raise ValueError ("Invalid type code: %d" % typecode)
@@ -95,6 +103,14 @@ class ManagementMetadata:
data = codec.decode_shortstr ()
elif typecode == 7:
data = codec.decode_longstr ()
+ elif typecode == 8: # ABSTIME
+ data = codec.decode_longlong ()
+ elif typecode == 9: # DELTATIME
+ data = codec.decode_longlong ()
+ elif typecode == 10: # REF
+ data = codec.decode_longlong ()
+ elif typecode == 11: # BOOL
+ data = codec.decode_octet ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
@@ -236,10 +252,7 @@ class ManagementMetadata:
elif cls == 'I':
self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps)
- def parse (self, codec):
- opcode = chr (codec.decode_octet ())
- cls = chr (codec.decode_octet ())
-
+ def parse (self, codec, opcode, cls):
if opcode == 'S':
self.parseSchema (cls, codec)
@@ -261,34 +274,53 @@ class ManagedBroker:
mExchange = "qpid.management"
dExchange = "amq.direct"
+ def setHeader (self, codec, opcode, cls = 0):
+ codec.encode_octet (ord ('A'))
+ codec.encode_octet (ord ('M'))
+ codec.encode_octet (ord ('0'))
+ codec.encode_octet (ord ('1'))
+ codec.encode_octet (opcode)
+ codec.encode_octet (cls)
+
def checkHeader (self, codec):
octet = chr (codec.decode_octet ())
if octet != 'A':
- return 0
+ return None
octet = chr (codec.decode_octet ())
if octet != 'M':
- return 0
+ return None
octet = chr (codec.decode_octet ())
if octet != '0':
- return 0
+ return None
octet = chr (codec.decode_octet ())
if octet != '1':
- return 0
- return 1
+ return None
+ opcode = chr (codec.decode_octet ())
+ cls = chr (codec.decode_octet ())
+ return (opcode, cls)
def publish_cb (self, msg):
codec = Codec (StringIO (msg.content.body), self.spec)
- if self.checkHeader (codec) == 0:
+ hdr = self.checkHeader (codec)
+ if hdr == None:
raise ValueError ("outer header invalid");
- self.metadata.parse (codec)
+ self.metadata.parse (codec, hdr[0], hdr[1])
msg.complete ()
def reply_cb (self, msg):
codec = Codec (StringIO (msg.content.body), self.spec)
- sequence = codec.decode_long ()
- status = codec.decode_long ()
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ msg.complete ()
+ return
+ if hdr[0] != 'R':
+ msg.complete ()
+ return
+
+ sequence = codec.decode_long ()
+ status = codec.decode_long ()
sText = codec.decode_shortstr ()
data = self.sequenceManager.release (sequence)
@@ -369,9 +401,10 @@ class ManagedBroker:
methodName, args=None, packageName="qpid"):
codec = Codec (StringIO (), self.spec);
sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+ self.setHeader (codec, ord ('M'))
codec.encode_long (sequence) # Method sequence id
codec.encode_longlong (objId) # ID of object
- codec.encode_shortstr (self.rqname) # name of reply queue
+ #codec.encode_shortstr (self.rqname) # name of reply queue
# Encode args according to schema
if (className,'M') not in self.metadata.schema:
@@ -402,6 +435,8 @@ class ManagedBroker:
msg["content_type"] = "application/octet-stream"
msg["routing_key"] = "method." + packageName + "." + className + "." + methodName
msg["reply_to"] = self.spec.struct ("reply_to")
+ msg["reply_to"]["exchange_name"] = "amq.direct"
+ msg["reply_to"]["routing_key"] = self.rqname
self.channel.message_transfer (destination="qpid.management", content=msg)
def isConnected (self):
@@ -414,7 +449,7 @@ class ManagedBroker:
self.client = Client (self.host, self.port, self.spec)
self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
self.channel = self.client.channel (1)
- response = self.channel.session_open (detached_lifetime=10)
+ response = self.channel.session_open (detached_lifetime=300)
self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 2147122f0a..db28f098bf 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -65,26 +65,23 @@
<configElement name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
<configElement name="storeLib" type="sstr" access="RO" desc="Name of persistent storage library"/>
<configElement name="asyncStore" type="bool" access="RO" desc="Use async persistent store"/>
- <configElement name="mgmtPubInterval" type="uint16" min="1" access="RW" unit="second" desc="Interval for management broadcasts"/>
+ <configElement name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
<configElement name="initialDiskPageSize" type="uint32" access="RO" desc="Number of disk pages allocated for storage"/>
<configElement name="initialPagesPerQueue" type="uint32" access="RO" desc="Number of disk pages allocated per queue"/>
<configElement name="clusterName" type="sstr" access="RO"
desc="Name of cluster this server is a member of, zero-length for standalone server"/>
<configElement name="version" type="sstr" access="RO" desc="Running software version"/>
-
-
<method name="joinCluster">
<arg name="clusterName" dir="I" type="sstr"/>
</method>
<method name="leaveCluster"/>
- <method name="echo">
+ <method name="echo" desc="Request a response to test the path to the management agent">
<arg name="sequence" dir="IO" type="uint32" default="0"/>
<arg name="body" dir="IO" type="lstr" default=""/>
</method>
-<!--<method name="crash" desc="Temporary test method to crash the broker"/>-->
</class>
<!--
@@ -135,12 +132,11 @@
<instElement name="consumers" type="hilo32" unit="consumer" desc="Current consumers on queue"/>
<instElement name="bindings" type="hilo32" unit="binding" desc="Current bindings"/>
<instElement name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/>
+ <instElement name="messageLatencyMin" type="uint64" unit="nanosecond" desc="Minimum broker latency through this queue"/>
+ <instElement name="messageLatencyMax" type="uint64" unit="nanosecond" desc="Maximum broker latency through this queue"/>
+ <instElement name="messageLatencyAvg" type="uint64" unit="nanosecond" desc="Average broker latency through this queue"/>
- <method name="purge" desc="Discard all messages on queue"/>
- <method name="increaseJournalSize" desc="Increase number of disk pages allocated for this queue">
- <arg name="pages" type="uint32" dir="I" desc="New total page allocation"/>
- </method>
-
+ <method name="purge" desc="Discard all messages on queue"/>
</class>
<!--
@@ -184,17 +180,16 @@
-->
<class name="client">
<configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/>
- <configElement name="ipAddr" type="uint32" access="RC" index="y"/>
- <configElement name="port" type="uint16" access="RC" index="y"/>
+ <configElement name="address" type="sstr" access="RC" index="y"/>
- <instElement name="authIdentity" type="sstr"/>
- <instElement name="msgsProduced" type="count64"/>
- <instElement name="msgsConsumed" type="count64"/>
- <instElement name="bytesProduced" type="count64"/>
- <instElement name="bytesConsumed" type="count64"/>
+ <instElement name="closing" type="bool" desc="This client is closing by management request"/>
+ <instElement name="authIdentity" type="sstr"/>
+ <instElement name="framesFromClient" type="count64"/>
+ <instElement name="framesToClient" type="count64"/>
+ <instElement name="bytesFromClient" type="count64"/>
+ <instElement name="bytesToClient" type="count64"/>
<method name="close"/>
- <method name="detach"/>
</class>
<!--
@@ -205,11 +200,12 @@
<class name="session">
<configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/>
<configElement name="name" type="sstr" access="RC" index="y"/>
+ <configElement name="channelId" type="uint16" access="RO"/>
<configElement name="clientRef" type="objId" access="RO"/>
- <configElement name="detachedLifespan" type="uint32" access="RO"/>
+ <configElement name="detachedLifespan" type="uint32" access="RO" unit="second"/>
<instElement name="attached" type="bool"/>
- <instElement name="remainingLifespan" type="count32"/>
+ <instElement name="expireTime" type="absTime"/>
<instElement name="framesOutstanding" type="count32"/>
<method name="solicitAck"/>
diff --git a/qpid/specs/management-types.xml b/qpid/specs/management-types.xml
index 9f251f032b..0929d965a6 100644
--- a/qpid/specs/management-types.xml
+++ b/qpid/specs/management-types.xml
@@ -19,14 +19,16 @@
under the License.
-->
-<type name="objId" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/>
-<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/>
-<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/>
-<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="bool" base="U8" cpp="bool" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/>
-<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/>
-<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/>
+<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/>
+<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/>
+<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/>
+<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/>
+<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/>
+<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/>
+<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/>