summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/console/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/console/Broker.cpp')
-rw-r--r--qpid/cpp/src/qpid/console/Broker.cpp333
1 files changed, 333 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/console/Broker.cpp b/qpid/cpp/src/qpid/console/Broker.cpp
new file mode 100644
index 0000000000..86a17d4a10
--- /dev/null
+++ b/qpid/cpp/src/qpid/console/Broker.cpp
@@ -0,0 +1,333 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/console/Broker.h"
+#include "qpid/console/Object.h"
+#include "qpid/console/Value.h"
+#include "qpid/console/SessionManager.h"
+#include "qpid/console/ConsoleListener.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/SystemInfo.h"
+
+using namespace qpid::client;
+using namespace qpid::console;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace std;
+
+Broker::Broker(SessionManager& sm, ConnectionSettings& settings) :
+ sessionManager(sm), connected(false), connectionSettings(settings),
+ reqsOutstanding(1), syncInFlight(false), topicBound(false), methodObject(0),
+ connThreadBody(*this), connThread(connThreadBody)
+{
+ string osName;
+ string nodeName;
+ string release;
+ string version;
+ string machine;
+
+ sys::SystemInfo::getSystemId(osName, nodeName, release, version, machine);
+ uint32_t pid = sys::SystemInfo::getParentProcessId();
+
+ stringstream text;
+
+ text << "qmfc-cpp-" << nodeName << "-" << pid;
+ amqpSessionId = string(text.str());
+
+ QPID_LOG(debug, "Broker::Broker: constructed, amqpSessionId=" << amqpSessionId);
+}
+
+Broker::~Broker()
+{
+ connThreadBody.shutdown();
+ connThread.join();
+ resetAgents();
+ // resetAgents() does not delete the broker agent...
+ for (AgentMap::iterator iter = agents.begin(); iter != agents.end(); iter++) {
+ delete iter->second;
+ }
+}
+
+string Broker::getUrl() const
+{
+ stringstream url;
+ url << connectionSettings.host << ":" << connectionSettings.port;
+ return url.str();
+}
+
+void Broker::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) const
+{
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('2');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
+}
+
+bool Broker::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) const
+{
+ if (buf.getSize() < 8)
+ return false;
+
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
+
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
+}
+
+void Broker::received(qpid::client::Message& msg)
+{
+#define QMF_HEADER_SIZE 8
+ string data = msg.getData();
+ Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
+ uint8_t opcode;
+ uint32_t sequence;
+
+ while (inBuffer.available() >= QMF_HEADER_SIZE) {
+ if (checkHeader(inBuffer, &opcode, &sequence)) {
+ QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence);
+
+ if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence);
+ else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence);
+ else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence);
+ else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence);
+ else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence);
+ else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence);
+ else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence);
+ else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence);
+ else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false);
+ else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true);
+ else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true);
+ } else
+ return;
+ }
+}
+
+void Broker::resetAgents()
+{
+ for (AgentMap::iterator iter = agents.begin(); iter != agents.end(); iter++) {
+ if (sessionManager.listener != 0)
+ sessionManager.listener->delAgent(*(iter->second));
+ delete iter->second;
+ }
+
+ agents.clear();
+ agents[0x0000000100000000LL] = new Agent(this, 0, "BrokerAgent");
+}
+
+void Broker::updateAgent(const Object& object)
+{
+ uint32_t brokerBank = object.attrUint("brokerBank");
+ uint32_t agentBank = object.attrUint("agentBank");
+ uint64_t agentKey = ((uint64_t) brokerBank << 32) | (uint64_t) agentBank;
+ AgentMap::iterator iter = agents.find(agentKey);
+
+ if (object.isDeleted()) {
+ if (iter != agents.end()) {
+ if (sessionManager.listener != 0)
+ sessionManager.listener->delAgent(*(iter->second));
+ delete iter->second;
+ agents.erase(iter);
+ }
+ } else {
+ if (iter == agents.end()) {
+ Agent* agent = new Agent(this, agentBank, object.attrString("label"));
+ agents[agentKey] = agent;
+ if (sessionManager.listener != 0)
+ sessionManager.listener->newAgent(*agent);
+ }
+ }
+}
+
+void Broker::ConnectionThread::run()
+{
+ static const int delayMin(1);
+ static const int delayMax(128);
+ static const int delayFactor(2);
+ int delay(delayMin);
+ string dest("qmfc");
+
+ sessionId.generate();
+ queueName << "qmfc-" << sessionId;
+
+ while (true) {
+ try {
+ broker.topicBound = false;
+ broker.reqsOutstanding = 1;
+ connection.open(broker.connectionSettings);
+ session = connection.newSession(queueName.str());
+ subscriptions = new client::SubscriptionManager(session);
+
+ session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
+ arg::exclusive=true);
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
+ arg::bindingKey=queueName.str());
+
+ subscriptions->setAcceptMode(ACCEPT_MODE_NONE);
+ subscriptions->setAcquireMode(ACQUIRE_MODE_PRE_ACQUIRED);
+ subscriptions->subscribe(broker, queueName.str(), dest);
+ subscriptions->setFlowControl(dest, FlowControl::unlimited());
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (shuttingDown)
+ return;
+ operational = true;
+ broker.resetAgents();
+ broker.connected = true;
+ broker.sessionManager.handleBrokerConnect(&broker);
+ broker.sessionManager.startProtocol(&broker);
+ try {
+ Mutex::ScopedUnlock _unlock(connLock);
+ subscriptions->run();
+ } catch (std::exception) {}
+
+ operational = false;
+ broker.connected = false;
+ broker.sessionManager.handleBrokerDisconnect(&broker);
+ }
+ delay = delayMin;
+ connection.close();
+ delete subscriptions;
+ subscriptions = 0;
+ } catch (std::exception &e) {
+ QPID_LOG(debug, " outer exception: " << e.what());
+ if (delay < delayMax)
+ delay *= delayFactor;
+ }
+
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (shuttingDown)
+ return;
+ {
+ Mutex::ScopedUnlock _unlock(connLock);
+ ::sleep(delay);
+ }
+ if (shuttingDown)
+ return;
+ }
+ }
+}
+
+Broker::ConnectionThread::~ConnectionThread()
+{
+ if (subscriptions != 0) {
+ delete subscriptions;
+ }
+}
+
+void Broker::ConnectionThread::sendBuffer(Buffer& buf, uint32_t length,
+ const string& exchange, const string& routingKey)
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (!operational)
+ return;
+ }
+
+ client::Message msg;
+ string data;
+
+ buf.getRawData(data, length);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.setData(data);
+ try {
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ } catch(std::exception&) {}
+}
+
+void Broker::ConnectionThread::bindExchange(const std::string& exchange, const std::string& key)
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (!operational)
+ return;
+ }
+
+ QPID_LOG(debug, "Broker::ConnectionThread::bindExchange: exchange=" << exchange << " key=" << key);
+ session.exchangeBind(arg::exchange=exchange, arg::queue=queueName.str(),
+ arg::bindingKey=key);
+}
+
+void Broker::ConnectionThread::shutdown()
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ shuttingDown = true;
+ }
+ if (subscriptions)
+ subscriptions->stop();
+}
+
+void Broker::waitForStable()
+{
+ Mutex::ScopedLock l(lock);
+ if (reqsOutstanding == 0)
+ return;
+ syncInFlight = true;
+ while (reqsOutstanding != 0) {
+ bool result = cond.wait(lock, AbsTime(now(), TIME_SEC * sessionManager.settings.getTimeout));
+ if (!result)
+ throw(Exception("Timed out waiting for broker to synchronize"));
+ }
+}
+
+void Broker::incOutstanding()
+{
+ Mutex::ScopedLock l(lock);
+ reqsOutstanding++;
+}
+
+void Broker::decOutstanding()
+{
+ Mutex::ScopedLock l(lock);
+ reqsOutstanding--;
+ if (reqsOutstanding == 0) {
+ if (!topicBound) {
+ topicBound = true;
+ for (vector<string>::const_iterator iter = sessionManager.bindingKeyList.begin();
+ iter != sessionManager.bindingKeyList.end(); iter++)
+ connThreadBody.bindExchange("qpid.management", *iter);
+ }
+ if (syncInFlight) {
+ syncInFlight = false;
+ cond.notify();
+ }
+ }
+}
+
+void Broker::appendAgents(Agent::Vector& agentlist) const
+{
+ for (AgentMap::const_iterator iter = agents.begin(); iter != agents.end(); iter++) {
+ agentlist.push_back(iter->second);
+ }
+}
+
+ostream& qpid::console::operator<<(ostream& o, const Broker& k)
+{
+ o << "Broker: " << k.connectionSettings.host << ":" << k.connectionSettings.port;
+ return o;
+}