summaryrefslogtreecommitdiff
path: root/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java')
-rw-r--r--qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java504
1 files changed, 504 insertions, 0 deletions
diff --git a/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java b/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
new file mode 100644
index 0000000000..16c77449f1
--- /dev/null
+++ b/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
@@ -0,0 +1,504 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.console;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Decoder;
+import org.apache.qpid.transport.codec.Encoder;
+
+public class Broker implements MessageListener
+{
+ class HeaderInfo
+ {
+ boolean valid;
+ long sequence;
+ char opcode;
+
+ public String toString()
+ {
+ return String.format("%s Header with opcode %s and sequence %s",
+ (valid ? "Valid" : "Invalid"), opcode, sequence);
+ }
+ }
+
+ private static Logger log = LoggerFactory.getLogger(Broker.class);
+ public static final int SYNC_TIME = 60000;
+ // JMS Stuff
+ private javax.jms.Session session;
+ boolean sessionTransacted = false;
+ private String replyName;
+ private String topicName;
+ private MessageProducer prod;
+ private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
+ private Queue reply;
+ private Queue topic;
+ private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE;
+ // QMF Stuff
+ AMQConnection connection;
+ public String url;
+ public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>();
+ private Session consoleSession;
+ private boolean connected = false;
+ private boolean syncInFlight = false;
+ private boolean topicBound = false;
+ private int reqsOutstanding = 0;
+ private Object lockObject = new Object();
+ UUID brokerId = UUID.randomUUID();
+
+ public Broker(org.apache.qpid.console.Session session, String url)
+ {
+ log.debug("Creating a new Broker for url " + url);
+ this.url = url;
+ consoleSession = session;
+ this.tryToConnect();
+ }
+
+ public int brokerBank()
+ {
+ return 1;
+ }
+
+ protected HeaderInfo CheckHeader(Decoder decoder)
+ {
+ HeaderInfo returnValue = new HeaderInfo();
+ returnValue.opcode = 'x';
+ returnValue.sequence = -1;
+ returnValue.valid = false;
+ if (decoder.hasRemaining())
+ {
+ char character = (char) decoder.readUint8();
+ if (character != 'A')
+ {
+ return returnValue;
+ }
+ character = (char) decoder.readUint8();
+ if (character != 'M')
+ {
+ return returnValue;
+ }
+ character = (char) decoder.readUint8();
+ if (character != '2')
+ {
+ return returnValue;
+ }
+ returnValue.valid = true;
+ returnValue.opcode = (char) decoder.readUint8();
+ returnValue.sequence = decoder.readUint32();
+ }
+ return returnValue;
+ }
+
+ public Encoder createEncoder(char opcode, long sequence)
+ {
+ return setHeader(new BBEncoder(1024), opcode, sequence);
+ }
+
+ public Message createMessage(Encoder enc)
+ {
+ try
+ {
+ byte[] buf = new byte[1024];
+ BBEncoder bbenc = (BBEncoder) enc;
+ BytesMessage msg = session.createBytesMessage();
+ ByteBuffer slice = bbenc.buffer();
+ while (slice.hasRemaining())
+ {
+ int n = Math.min(buf.length, slice.remaining());
+ slice.get(buf, 0, n);
+ msg.writeBytes(buf, 0, n);
+ }
+ return msg;
+ } catch (JMSException e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+
+ public void decrementOutstanding()
+ {
+ synchronized (lockObject)
+ {
+ this.reqsOutstanding -= 1;
+ if ((reqsOutstanding == 0) & !topicBound)
+ {
+ for (String key : consoleSession.bindingKeys())
+ {
+ try
+ {
+ // this.clientSession.exchangeBind(topicName,
+ // "qpid.mannagement", key) ;
+ log.debug("Setting Topic Binding " + key);
+ // topicName = "management://qpid.management//" + key;
+ String rk = String.format("&routingkey='%s'", key);
+ Queue aQueue = session.createQueue(topicName + rk);
+ MessageConsumer cons = session.createConsumer(aQueue);
+ cons.setMessageListener(this);
+ consumers.add(cons);
+ } catch (JMSException e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+ topicBound = true;
+ }
+ if ((reqsOutstanding == 0) & syncInFlight)
+ {
+ syncInFlight = false;
+ lockObject.notifyAll();
+ }
+ }
+ }
+
+ private byte[] ensure(int capacity, byte[] body, int size)
+ {
+ if (capacity > body.length)
+ {
+ byte[] copy = new byte[capacity];
+ System.arraycopy(body, 0, copy, 0, size);
+ body = copy;
+ }
+ return body;
+ }
+
+ protected void finalize()
+ {
+ if (connected)
+ {
+ this.shutdown();
+ }
+ }
+
+ public boolean getSyncInFlight()
+ {
+ return syncInFlight;
+ }
+
+ public void incrementOutstanding()
+ {
+ synchronized (lockObject)
+ {
+ this.reqsOutstanding += 1;
+ }
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public void onMessage(Message msg)
+ {
+ Decoder decoder = readBody(msg);
+ HeaderInfo headerInfo = this.CheckHeader(decoder);
+ // log.debug(headerInfo.toString());
+ while (headerInfo.valid)
+ {
+ long seq = headerInfo.sequence;
+ switch (headerInfo.opcode)
+ {
+ case 'b':
+ consoleSession.handleBrokerResponse(this, decoder, seq);
+ break;
+ case 'p':
+ consoleSession.handlePackageIndicator(this, decoder, seq);
+ break;
+ case 'z':
+ consoleSession.handleCommandComplete(this, decoder, seq);
+ break;
+ case 'q':
+ consoleSession.handleClassIndicator(this, decoder, seq);
+ break;
+ case 'm':
+ consoleSession.handleMethodResponse(this, decoder, seq);
+ break;
+ case 'h':
+ consoleSession
+ .handleHeartbeatIndicator(this, decoder, seq, msg);
+ break;
+ case 'e':
+ consoleSession.handleEventIndicator(this, decoder, seq);
+ break;
+ case 's':
+ consoleSession.handleSchemaResponse(this, decoder, seq);
+ break;
+ case 'c':
+ consoleSession.handleContentIndicator(this, decoder, seq, true,
+ false);
+ break;
+ case 'i':
+ consoleSession.handleContentIndicator(this, decoder, seq,
+ false, true);
+ break;
+ case 'g':
+ consoleSession.handleContentIndicator(this, decoder, seq, true,
+ true);
+ break;
+ default:
+ log.error("Invalid message type recieved with opcode "
+ + headerInfo.opcode);
+ break;
+ }
+ headerInfo = this.CheckHeader(decoder);
+ }
+ }
+
+ private Decoder readBody(Message message)
+ {
+ BytesMessage msg = (BytesMessage) message;
+ BBDecoder dec = new BBDecoder();
+ byte[] buf = new byte[1024];
+ byte[] body = new byte[1024];
+ int size = 0;
+ int n;
+ try
+ {
+ while ((n = msg.readBytes(buf)) > 0)
+ {
+ body = ensure(size + n, body, size);
+ System.arraycopy(buf, 0, body, size, n);
+ size += n;
+ }
+ } catch (JMSException e)
+ {
+ throw new ConsoleException(e);
+ }
+ dec.init(ByteBuffer.wrap(body, 0, size));
+ return dec;
+ }
+
+ public void send(Encoder enc)
+ {
+ this.send(this.createMessage(enc), "broker");
+ }
+
+ public void send(Message msg)
+ {
+ this.send(msg, "broker", -1);
+ }
+
+ public void send(Message msg, String routingKey)
+ {
+ this.send(msg, routingKey, -1);
+ }
+
+ public void send(Message msg, String routingKey, int ttl)
+ {
+ synchronized (lockObject)
+ {
+ try
+ {
+ log.debug(String.format("Sending message to routing key '%s'",
+ routingKey));
+ String destName = String.format(
+ "management://qpid.management//?routingkey='%s'",
+ routingKey);
+ log.debug(destName);
+ Queue dest = session.createQueue(destName);
+ // Queue jmsReply = session
+ // createQueue("direct://amq.direct//?routingkey='reply-"
+ // + brokerId + "'");
+ if (ttl != -1)
+ {
+ msg.setJMSExpiration(ttl);
+ }
+ msg.setJMSReplyTo(reply);
+ prod.send(dest, msg);
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+ }
+
+ protected Encoder setHeader(Encoder enc, char opcode, long sequence)
+ {
+ enc.writeUint8((short) 'A');
+ enc.writeUint8((short) 'M');
+ enc.writeUint8((short) '2');
+ enc.writeUint8((short) opcode);
+ enc.writeUint32(sequence);
+ return enc;
+ }
+
+ public void setSyncInFlight(boolean inFlight)
+ {
+ synchronized (lockObject)
+ {
+ syncInFlight = inFlight;
+ lockObject.notifyAll();
+ }
+ }
+
+ public void shutdown()
+ {
+ if (connected)
+ {
+ this.waitForStable();
+ try
+ {
+ session.close();
+ for (MessageConsumer cons : consumers)
+ {
+ cons.close();
+ }
+ connection.close();
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ } finally
+ {
+ this.connected = false;
+ }
+ }
+ }
+
+ protected void tryToConnect()
+ {
+ try
+ {
+ reqsOutstanding = 1;
+ Agent newAgent = new Agent(this, 0, "BrokerAgent");
+ Agents.put(newAgent.agentKey(), newAgent);
+ connection = new AMQConnection(url);
+ session = connection.createSession(sessionTransacted,
+ acknowledgeMode);
+ replyName = String
+ .format(
+ "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'",
+ brokerId);
+ topicName = String
+ .format(
+ "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'",
+ brokerId);
+ reply = session.createQueue(replyName);
+ MessageConsumer cons = session.createConsumer(reply);
+ cons.setMessageListener(this);
+ consumers.add(cons);
+ prod = session.createProducer(null);
+ topic = session.createQueue(topicName);
+ cons = session.createConsumer(topic);
+ cons.setMessageListener(this);
+ consumers.add(cons);
+ connection.start();
+ // Rest of the topic is bound later. Start er up
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ connected = true;
+ consoleSession.handleBrokerConnect(this);
+ Encoder Encoder = createEncoder('B', 0);
+ this.send(Encoder);
+ }
+
+ public void updateAgent(QMFObject obj)
+ {
+ long agentBank = (Long) obj.getProperty("agentBank");
+ long brokerBank = (Long) obj.getProperty("brokerBank");
+ String key = Agent.AgentKey(agentBank, brokerBank);
+ if (obj.isDeleted())
+ {
+ if (Agents.containsKey(key))
+ {
+ Agent agent = Agents.get(key);
+ Agents.remove(key);
+ consoleSession.handleAgentRemoved(agent);
+ }
+ } else
+ {
+ if (!Agents.containsKey(key))
+ {
+ Agent newAgent = new Agent(this, agentBank, (String) obj
+ .getProperty("label"));
+ Agents.put(key, newAgent);
+ consoleSession.handleNewAgent(newAgent);
+ }
+ }
+ }
+
+ public void waitForStable()
+ {
+ synchronized (lockObject)
+ {
+ if (connected)
+ {
+ long start = System.currentTimeMillis();
+ syncInFlight = true;
+ while (reqsOutstanding != 0)
+ {
+ log.debug("Waiting to recieve messages");
+ try
+ {
+ lockObject.wait(SYNC_TIME);
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ long duration = System.currentTimeMillis() - start;
+ if (duration > SYNC_TIME)
+ {
+ throw new ConsoleException(
+ "Timeout waiting for Broker to Sync");
+ }
+ }
+ }
+ }
+ }
+
+ public void waitForSync(int timeout)
+ {
+ synchronized (lockObject)
+ {
+ long start = System.currentTimeMillis();
+ while (syncInFlight)
+ {
+ try
+ {
+ lockObject.wait(SYNC_TIME);
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+ long duration = System.currentTimeMillis() - start;
+ if (duration > timeout)
+ {
+ throw new ConsoleException("Timeout waiting for Broker to Sync");
+ }
+ }
+ }
+}