summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp7
-rw-r--r--cpp/src/qpid/broker/Connection.h2
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp15
-rw-r--r--cpp/src/qpid/broker/Exchange.h30
-rw-r--r--cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp5
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp56
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/Connection.h3
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp2
-rw-r--r--cpp/src/qpid/management/IdAllocator.h42
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp14
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h8
13 files changed, 167 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 66ee6281c6..b7446a2220 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -48,7 +48,7 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_, uint64_t objectId) :
ConnectionState(out_, broker_),
adapter(*this, isLink_),
isLink(isLink_),
@@ -70,9 +70,10 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
// TODO set last bool true if system connection
- if (agent != 0)
+ if (agent != 0) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
- agent->addObject(mgmtObject);
+ agent->addObject(mgmtObject, objectId);
+ }
ConnectionState::setUrl(mgmtId);
}
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 559cd4cfe3..80d828584d 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -64,7 +64,7 @@ class Connection : public sys::ConnectionInputHandler,
public RefCounted
{
public:
- Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 34673bdab3..53c49bf0ce 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -22,6 +22,7 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
#include "qpid/agent/ManagementAgent.h"
+#include "qpid/management/ManagementBroker.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
#include "DeliverableMessage.h"
@@ -32,6 +33,7 @@ using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::sys::Mutex;
using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -109,12 +111,14 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
mgmtExchange->set_arguments(args);
if (!durable) {
- if (name == "")
+ if (name == "") {
agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID
- else if (name == "qpid.management")
+ } else if (name == "qpid.management") {
agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID
- else
- agent->addObject (mgmtExchange);
+ } else {
+ ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+ agent->addObject (mgmtExchange, mb ? mb->allocateId(this) : 0);
+ }
}
}
}
@@ -245,7 +249,8 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
(agent, this, (Manageable*) parent, queueId, key, args);
if (!origin.empty())
mgmtBinding->set_origin(origin);
- agent->addObject (mgmtBinding);
+ ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+ agent->addObject (mgmtBinding, mb ? mb->allocateId(this) : 0);
}
}
}
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 5de3e98bc0..488549bbf6 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -39,6 +39,22 @@ namespace broker {
class ExchangeRegistry;
class Exchange : public PersistableExchange, public management::Manageable {
+public:
+ struct Binding : public management::Manageable {
+ typedef boost::shared_ptr<Binding> shared_ptr;
+ typedef std::vector<Binding::shared_ptr> vector;
+
+ Queue::shared_ptr queue;
+ const std::string key;
+ const framing::FieldTable args;
+ qmf::org::apache::qpid::broker::Binding* mgmtBinding;
+
+ Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
+ framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
+ ~Binding();
+ management::ManagementObject* GetManagementObject() const;
+ };
+
private:
const std::string name;
const bool durable;
@@ -64,20 +80,6 @@ protected:
void routeIVE();
- struct Binding : public management::Manageable {
- typedef boost::shared_ptr<Binding> shared_ptr;
- typedef std::vector<Binding::shared_ptr> vector;
-
- Queue::shared_ptr queue;
- const std::string key;
- const framing::FieldTable args;
- qmf::org::apache::qpid::broker::Binding* mgmtBinding;
-
- Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
- framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
- ~Binding();
- management::ManagementObject* GetManagementObject() const;
- };
struct MatchQueue {
const Queue::shared_ptr queue;
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 962b463571..d459c64c54 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -30,6 +30,7 @@
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
@@ -46,6 +47,7 @@ using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -103,8 +105,10 @@ Queue::Queue(const string& _name, bool _autodelete,
// Add the object to the management agent only if this queue is not durable.
// If it's durable, we will add it later when the queue is assigned a persistenceId.
- if (store == 0)
- agent->addObject (mgmtObject);
+ if (store == 0) {
+ ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+ agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
+ }
}
}
}
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index e17a813db7..0a24a39d38 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
@@ -41,6 +42,7 @@ using namespace framing;
using sys::Mutex;
using boost::intrusive_ptr;
using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
@@ -65,7 +67,8 @@ SessionState::SessionState(
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
- agent->addObject (mgmtObject);
+ ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+ agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
}
}
attach(h);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 6e1d275162..79c34d6873 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -21,13 +21,21 @@
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ConnectionCodec.h"
+#include "qpid/cluster/DumpClient.h"
#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionState.h"
+
#include <boost/utility/in_place_factory.hpp>
#include <boost/scoped_ptr.hpp>
@@ -36,6 +44,9 @@ namespace cluster {
using namespace std;
using broker::Broker;
+using management::IdAllocator;
+using management::ManagementAgent;
+using management::ManagementBroker;
struct ClusterValues {
string name;
@@ -76,6 +87,46 @@ struct ClusterOptions : public Options {
}
};
+struct DumpClientIdAllocator : management::IdAllocator
+{
+ qpid::sys::AtomicValue<uint64_t> sequence;
+
+ DumpClientIdAllocator() : sequence(0x4000000000000000LL) {}
+
+ uint64_t getIdFor(management::Manageable* m)
+ {
+ if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) {
+ return ++sequence;
+ } else {
+ return 0;
+ }
+ }
+
+ bool isDumpQueue(management::Manageable* manageable)
+ {
+ qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable);
+ return queue && queue->getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpExchange(management::Manageable* manageable)
+ {
+ qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable);
+ return exchange && exchange->getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpSession(management::Manageable* manageable)
+ {
+ broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable);
+ return session && session->getId().getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpBinding(management::Manageable* manageable)
+ {
+ broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable);
+ return binding && binding->queue->getName() == DumpClient::DUMP;
+ }
+};
+
struct ClusterPlugin : public Plugin {
ClusterValues values;
@@ -102,6 +153,11 @@ struct ClusterPlugin : public Plugin {
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
broker->getExchanges().registerExchange(cluster->getFailoverExchange());
+ ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
+ if (mgmt) {
+ std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator());
+ mgmt->setAllocator(allocator);
+ }
}
void earlyInitialize(Plugin::Target&) {}
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index ac4b9dcdf2..d05baffe3a 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -69,7 +69,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0),
+ connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), readCredit(0),
expectProtocolHeader(isLink)
{ init(); }
@@ -396,5 +396,7 @@ void Connection::queue(const std::string& encoded) {
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 5d46b7e81d..29dee5eda4 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -31,6 +31,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/framing/FrameDecoder.h"
@@ -173,6 +174,8 @@ class Connection :
boost::shared_ptr<broker::TxBuffer> txBuffer;
int readCredit;
bool expectProtocolHeader;
+
+ static qpid::sys::AtomicValue<uint64_t> catchUpId;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 3f3212470d..00328eb310 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -94,7 +94,7 @@ DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url
done(ok), failed(fail)
{
connection.open(url);
- session = connection.newSession("dump_shared");
+ session = connection.newSession(DUMP);
}
DumpClient::~DumpClient() {}
diff --git a/cpp/src/qpid/management/IdAllocator.h b/cpp/src/qpid/management/IdAllocator.h
new file mode 100644
index 0000000000..6fbc99afff
--- /dev/null
+++ b/cpp/src/qpid/management/IdAllocator.h
@@ -0,0 +1,42 @@
+#ifndef QPID_MANAGEMENT_IDALLOCATOR_H
+#define QPID_MANAGEMENT_IDALLOCATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+
+namespace qpid {
+namespace management {
+
+/**
+ * Interface through which plugins etc can control the mgmt object id
+ * allocation for special cases
+ */
+struct IdAllocator
+{
+ virtual uint64_t getIdFor(Manageable* object) = 0;
+ virtual ~IdAllocator() {}
+};
+
+}} // namespace qpid::management
+
+#endif /*!QPID_MANAGEMENT_IDALLOCATOR_H*/
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 2175bc4676..0f96e97fb0 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -20,6 +20,7 @@
*/
#include "ManagementBroker.h"
+#include "IdAllocator.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/log/Statement.h"
#include <qpid/broker/Message.h>
@@ -1135,3 +1136,16 @@ size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
inBuffer.restore(); // restore original position
return end - start;
}
+
+void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a)
+{
+ Mutex::ScopedLock lock (addLock);
+ allocator = a;
+}
+
+uint64_t ManagementBroker::allocateId(Manageable* object)
+{
+ Mutex::ScopedLock lock (addLock);
+ if (allocator.get()) return allocator->getIdFor(object);
+ return 0;
+}
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
index 59dfb98596..f65d6a345e 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -32,10 +32,13 @@
#include "Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
#include <qpid/framing/AMQFrame.h>
+#include <memory>
namespace qpid {
namespace management {
+struct IdAllocator;
+
class ManagementBroker : public ManagementAgent
{
private:
@@ -43,7 +46,6 @@ private:
int threadPoolSize;
public:
-
ManagementBroker ();
virtual ~ManagementBroker ();
@@ -78,6 +80,8 @@ public:
uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
int getSignalFd () { assert(0); return -1; }
+ void setAllocator(std::auto_ptr<IdAllocator> allocator);
+ uint64_t allocateId(Manageable* object);
private:
friend class ManagementAgent;
@@ -179,6 +183,8 @@ private:
uint32_t nextRequestSequence;
bool clientWasAdded;
+ std::auto_ptr<IdAllocator> allocator;
+
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];