summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-01-02 15:56:20 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-01-02 15:56:20 +0000
commit3fe6853a7029e48f693c0853e51af33be5c79aec (patch)
tree6139a715591aabc91370350aa26f854639a2aa11 /cpp
parent8bc0b992a0e67259a7d9c525bbbbbc32fbc60a20 (diff)
downloadqpid-python-3fe6853a7029e48f693c0853e51af33be5c79aec.tar.gz
patch-715 (tross)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@608135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/managementgen/main.py7
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/Connection.cpp66
-rw-r--r--cpp/src/qpid/broker/Connection.h16
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp5
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.h4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp4
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp75
-rw-r--r--cpp/src/qpid/broker/SessionState.h11
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp85
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h8
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h2
-rw-r--r--cpp/src/qpid/management/ManagementObject.h16
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp2
-rw-r--r--cpp/src/qpid/sys/ConnectionInputHandlerFactory.h3
17 files changed, 262 insertions, 51 deletions
diff --git a/cpp/managementgen/main.py b/cpp/managementgen/main.py
index 2f70639482..ddf18ef873 100755
--- a/cpp/managementgen/main.py
+++ b/cpp/managementgen/main.py
@@ -41,8 +41,7 @@ parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE",
if opts.outdir == None or \
opts.typefile == None or \
opts.schemafile == None or \
- opts.templatedir == None or \
- opts.makefile == None:
+ opts.templatedir == None:
parser.error ("Incorrect options, see --help for help")
gen = Generator (opts.outdir, opts.templatedir)
@@ -51,4 +50,6 @@ schema = PackageSchema (opts.typefile, opts.schemafile)
gen.makeClassFiles ("Class.h", schema)
gen.makeClassFiles ("Class.cpp", schema)
gen.makeMethodFiles ("Args.h", schema)
-gen.makeMakeFile (opts.makefile)
+
+if opts.makefile != None:
+ gen.makeMakeFile (opts.makefile)
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index d83eabee06..5607d22498 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -231,6 +231,7 @@ void Broker::run() {
void Broker::shutdown() {
if (acceptor)
acceptor->shutdown();
+ ManagementAgent::shutdown ();
}
Broker::~Broker() {
@@ -270,6 +271,11 @@ ManagementObject::shared_ptr Broker::GetManagementObject(void) const
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
}
+Manageable* Broker::GetVhostObject(void) const
+{
+ return vhostObject.get();
+}
+
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
Args& /*_args*/)
{
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 1c1c303be8..aaa0be72bb 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -119,6 +119,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
SessionManager& getSessionManager() { return sessionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable* GetVhostObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index f2cd1c11e4..fbe018e8ae 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -26,6 +26,7 @@
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/management/ManagementAgent.h"
#include <boost/bind.hpp>
@@ -38,11 +39,15 @@ using namespace qpid::sys;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid::ptr_map;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
namespace qpid {
namespace broker {
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
+ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const Socket& s) :
broker(broker_),
outputTasks(*out_),
out(out_),
@@ -50,15 +55,45 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
heartbeat(0),
client(0),
stagingThreshold(broker.getStagingThreshold()),
- adapter(*this)
-{}
+ adapter(*this),
+ mgmtClosing(0)
+{
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Client::shared_ptr
+ (new management::Client (this, parent, s.getPeerAddress ()));
+ agent->addObject (mgmtObject);
+ }
+ }
+}
+
+Connection::~Connection ()
+{
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
void Connection::received(framing::AMQFrame& frame){
+ if (mgmtClosing)
+ close (403, "Closed by Management Request", 0, 0);
+
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
getChannel(frame.getChannel()).in(frame);
}
+
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->inc_framesFromClient ();
+ mgmtObject->inc_bytesFromClient (frame.size ());
+ }
}
void Connection::close(
@@ -122,5 +157,30 @@ SessionHandler& Connection::getChannel(ChannelId id) {
return *get_pointer(i);
}
+ManagementObject::shared_ptr Connection::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
+ Args& /*args*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Client::METHOD_CLOSE :
+ mgmtClosing = 1;
+ mgmtObject->set_closing (1);
+ status = Manageable::STATUS_OK;
+ break;
+ }
+
+ return status;
+}
+
+
}}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 395aa7b0bd..cbe7addec2 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -35,9 +35,12 @@
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/framing/ProtocolVersion.h"
#include "Broker.h"
+#include "qpid/sys/Socket.h"
#include "qpid/Exception.h"
#include "ConnectionHandler.h"
#include "SessionHandler.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Client.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -45,10 +48,12 @@ namespace qpid {
namespace broker {
class Connection : public sys::ConnectionInputHandler,
- public ConnectionToken
+ public ConnectionToken,
+ public management::Manageable
{
public:
- Connection(sys::ConnectionOutputHandler* out, Broker& broker);
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker, const sys::Socket& s);
+ ~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
SessionHandler& getChannel(framing::ChannelId channel);
@@ -85,6 +90,11 @@ class Connection : public sys::ConnectionInputHandler,
void closeChannel(framing::ChannelId channel);
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -97,6 +107,8 @@ class Connection : public sys::ConnectionInputHandler,
framing::AMQP_ClientProxy::Connection* client;
uint64_t stagingThreshold;
ConnectionHandler adapter;
+ management::Client::shared_ptr mgmtObject;
+ bool mgmtClosing;
};
}}
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index 4429a2583c..c925c4be2f 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -35,9 +35,10 @@ ConnectionFactory::~ConnectionFactory()
}
qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out)
+ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
+ const qpid::sys::Socket& s)
{
- return new Connection(out, broker);
+ return new Connection(out, broker, s);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h
index 6af3dda7a6..23fba5c1ab 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/cpp/src/qpid/broker/ConnectionFactory.h
@@ -32,8 +32,8 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
public:
ConnectionFactory(Broker& b);
- virtual qpid::sys::ConnectionInputHandler* create(
- qpid::sys::ConnectionOutputHandler* ctxt);
+ virtual qpid::sys::ConnectionInputHandler* create
+ (qpid::sys::ConnectionOutputHandler* ctxt, const sys::Socket& s);
virtual ~ConnectionFactory();
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b68a7db8b0..f753e7ef75 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -616,10 +616,6 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId,
purge ();
status = Manageable::STATUS_OK;
break;
-
- case management::Queue::METHOD_INCREASEJOURNALSIZE :
- status = Manageable::STATUS_NOT_IMPLEMENTED;
- break;
}
return status;
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index f12ebc6db1..1d5f9ebada 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -58,6 +58,8 @@ void SessionManager::suspend(std::auto_ptr<SessionState> session) {
active.erase(session->getId());
session->suspend();
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+ if (session->mgmtObject.get() != 0)
+ session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
suspended.push_back(session.release()); // In expiry order
eraseExpired();
}
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index bea1eaedcf..a75b32cbb5 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -31,6 +31,10 @@ namespace broker {
using namespace framing;
using sys::Mutex;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
@@ -50,11 +54,34 @@ SessionState::SessionState(
// TODO aconway 2007-09-20: SessionManager may add plugin
// handlers to the chain.
getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Session::shared_ptr
+ (new management::Session (this, parent, id.str ()));
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
+ mgmtObject->set_detachedLifespan (getTimeout());
+ agent->addObject (mgmtObject);
+ }
+ }
}
SessionState::~SessionState() {
// Remove ID from active session list.
factory.erase(getId());
+
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->resourceDestroy ();
+ }
}
SessionHandler* SessionState::getHandler() {
@@ -75,12 +102,22 @@ void SessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
Mutex::ScopedLock l(lock);
handler = 0;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (0);
+ }
}
void SessionState::attach(SessionHandler& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
+ }
}
h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
}
@@ -96,4 +133,42 @@ void SessionState::activateOutput()
//if not attached, it can simply ignore the callback, else pass it
//on to the connection
+ManagementObject::shared_ptr SessionState::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
+ Args& /*args*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ switch (methodId)
+ {
+ case management::Session::METHOD_DETACH :
+ if (handler != 0)
+ {
+ handler->localSuspend ();
+ }
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Session::METHOD_CLOSE :
+ if (handler != 0)
+ {
+ handler->getConnection().closeChannel(handler->getChannel());
+ }
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Session::METHOD_SOLICITACK :
+ case management::Session::METHOD_RESETLIFESPAN :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
+}
+
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index ac2a33442a..c8c32a046d 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -29,6 +29,8 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/sys/Time.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Session.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -57,7 +59,8 @@ class Connection;
*/
class SessionState : public framing::SessionState,
public framing::FrameHandler::InOutHandler,
- public sys::OutputControl
+ public sys::OutputControl,
+ public management::Manageable
{
public:
~SessionState();
@@ -82,6 +85,11 @@ class SessionState : public framing::SessionState,
/** OutputControl **/
void activateOutput();
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
@@ -102,6 +110,7 @@ class SessionState : public framing::SessionState,
framing::ProtocolVersion version;
sys::Mutex lock;
boost::scoped_ptr<SemanticHandler> semanticHandler;
+ management::Session::shared_ptr mgmtObject;
friend class SessionManager;
};
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 90da74404b..d3c5d7c266 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -41,6 +41,8 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
}
+ManagementAgent::~ManagementAgent () {}
+
void ManagementAgent::enableManagement (void)
{
enabled = 1;
@@ -54,6 +56,16 @@ ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
return agent;
}
+void ManagementAgent::shutdown (void)
+{
+ if (agent.get () != 0)
+ {
+ agent->mExchange.reset ();
+ agent->dExchange.reset ();
+ agent.reset ();
+ }
+}
+
void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
broker::Exchange::shared_ptr _dexchange)
{
@@ -73,6 +85,8 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object)
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
: TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
+ManagementAgent::Periodic::~Periodic () {}
+
void ManagementAgent::Periodic::fire ()
{
agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
@@ -93,12 +107,27 @@ void ManagementAgent::clientAdded (void)
}
}
-void ManagementAgent::EncodeHeader (Buffer& buf)
+void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls)
{
buf.putOctet ('A');
buf.putOctet ('M');
buf.putOctet ('0');
buf.putOctet ('1');
+ buf.putOctet (opcode);
+ buf.putOctet (cls);
+}
+
+bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls)
+{
+ uint8_t h1 = buf.getOctet ();
+ uint8_t h2 = buf.getOctet ();
+ uint8_t h3 = buf.getOctet ();
+ uint8_t h4 = buf.getOctet ();
+
+ *opcode = buf.getOctet ();
+ *cls = buf.getOctet ();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1';
}
void ManagementAgent::SendBuffer (Buffer& buf,
@@ -112,9 +141,6 @@ void ManagementAgent::SendBuffer (Buffer& buf,
AMQFrame header (in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>());
- QPID_LOG (debug, "ManagementAgent::SendBuffer - key="
- << routingKey << " len=" << length);
-
content.castBody<AMQContentBody>()->decode(buf, length);
method.setEof (false);
@@ -156,9 +182,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getSchemaNeeded ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('S'); // opcode = Schema Record
- msgBuffer.putOctet (0); // content-class = N/A
+ EncodeHeader (msgBuffer, 'S');
object->writeSchema (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -170,9 +194,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('C'); // content-class = Configuration
+ EncodeHeader (msgBuffer, 'C', 'C');
object->writeConfig (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -184,9 +206,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getInstChanged ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('I'); // content-class = Instrumentation
+ EncodeHeader (msgBuffer, 'C', 'I');
object->writeInstrumentation (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -251,9 +271,6 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
start = pos + 1;
string methodName = routingKey.substr (start, routingKey.length () - start);
- QPID_LOG (debug, "Dispatch package: " << packageName << ", class: "
- << className << ", method: " << methodName);
-
contentSize = msg.encodedContentSize ();
if (contentSize < 8 || contentSize > 65536)
return;
@@ -263,19 +280,41 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
Buffer inBuffer (inMem, contentSize);
Buffer outBuffer (outMem, 4096);
uint32_t outLen;
+ uint8_t opcode, unused;
msg.encodeContent (inBuffer);
inBuffer.reset ();
- uint32_t methodId = inBuffer.getLong ();
- uint64_t objId = inBuffer.getLongLong ();
- string replyTo;
+ if (!CheckHeader (inBuffer, &opcode, &unused))
+ {
+ QPID_LOG (debug, " Invalid content header");
+ return;
+ }
+
+ if (opcode != 'M')
+ {
+ QPID_LOG (debug, " Unexpected opcode " << opcode);
+ return;
+ }
- inBuffer.getShortString (replyTo);
+ uint32_t methodId = inBuffer.getLong ();
+ uint64_t objId = inBuffer.getLongLong ();
+ string replyToKey;
- QPID_LOG (debug, " len = " << contentSize << ", methodId = " <<
- methodId << ", objId = " << objId);
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo())
+ {
+ const framing::ReplyTo& rt = p->getReplyTo ();
+ replyToKey = rt.getRoutingKey ();
+ }
+ else
+ {
+ QPID_LOG (debug, " Reply-to missing");
+ return;
+ }
+ EncodeHeader (outBuffer, 'R');
outBuffer.putLong (methodId);
ManagementObjectMap::iterator iter = managementObjects.find (objId);
@@ -291,7 +330,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
outLen = 4096 - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyTo);
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
free (inMem);
}
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index a4f10632da..36ba1f0542 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -40,10 +40,13 @@ class ManagementAgent
public:
+ virtual ~ManagementAgent ();
+
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
static void enableManagement (void);
static shared_ptr getAgent (void);
+ static void shutdown (void);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (broker::Exchange::shared_ptr mgmtExchange,
@@ -61,7 +64,7 @@ class ManagementAgent
ManagementAgent& agent;
Periodic (ManagementAgent& agent, uint32_t seconds);
- ~Periodic () {}
+ virtual ~Periodic ();
void fire ();
};
@@ -77,7 +80,8 @@ class ManagementAgent
uint64_t nextObjectId;
void PeriodicProcessing (void);
- void EncodeHeader (qpid::framing::Buffer& buf);
+ void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0);
+ bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls);
void SendBuffer (qpid::framing::Buffer& buf,
uint32_t length,
broker::Exchange::shared_ptr exchange,
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index 0fcd65b092..1a79482c9d 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -40,7 +40,7 @@ class ManagementExchange : public virtual TopicExchange
const qpid::framing::FieldTable& _args,
Manageable* _parent = 0);
- virtual std::string getType() const { return typeName; }
+ virtual std::string getType() const { return typeName; }
virtual bool bind (Queue::shared_ptr queue,
const string& routingKey,
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index ff136c397d..a32055721d 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -46,12 +46,16 @@ class ManagementObject
Manageable* coreObject;
std::string className;
- static const uint8_t TYPE_U8 = 1;
- static const uint8_t TYPE_U16 = 2;
- static const uint8_t TYPE_U32 = 3;
- static const uint8_t TYPE_U64 = 4;
- static const uint8_t TYPE_SSTR = 6;
- static const uint8_t TYPE_LSTR = 7;
+ static const uint8_t TYPE_U8 = 1;
+ static const uint8_t TYPE_U16 = 2;
+ static const uint8_t TYPE_U32 = 3;
+ static const uint8_t TYPE_U64 = 4;
+ static const uint8_t TYPE_SSTR = 6;
+ static const uint8_t TYPE_LSTR = 7;
+ static const uint8_t TYPE_ABSTIME = 8;
+ static const uint8_t TYPE_DELTATIME = 9;
+ static const uint8_t TYPE_REF = 10;
+ static const uint8_t TYPE_BOOL = 11;
static const uint8_t ACCESS_RC = 1;
static const uint8_t ACCESS_RW = 2;
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 51ec7f718a..9ca083354b 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -131,7 +131,7 @@ public:
void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
AsynchIOHandler* async = new AsynchIOHandler;
- ConnectionInputHandler* handler = f->create(async);
+ ConnectionInputHandler* handler = f->create(async, s);
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
diff --git a/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
index af7d411928..4a90f8b736 100644
--- a/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
+++ b/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
@@ -22,6 +22,7 @@
#define _ConnectionInputHandlerFactory_
#include <boost/noncopyable.hpp>
+#include "qpid/sys/Socket.h"
namespace qpid {
namespace sys {
@@ -36,7 +37,7 @@ class ConnectionInputHandler;
class ConnectionInputHandlerFactory : private boost::noncopyable
{
public:
- virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0;
+ virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt, const Socket& s) = 0;
virtual ~ConnectionInputHandlerFactory(){}
};