diff options
Diffstat (limited to 'qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java')
-rw-r--r-- | qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java | 504 |
1 files changed, 504 insertions, 0 deletions
diff --git a/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java b/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java new file mode 100644 index 0000000000..16c77449f1 --- /dev/null +++ b/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java @@ -0,0 +1,504 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encoder; + +public class Broker implements MessageListener +{ + class HeaderInfo + { + boolean valid; + long sequence; + char opcode; + + public String toString() + { + return String.format("%s Header with opcode %s and sequence %s", + (valid ? "Valid" : "Invalid"), opcode, sequence); + } + } + + private static Logger log = LoggerFactory.getLogger(Broker.class); + public static final int SYNC_TIME = 60000; + // JMS Stuff + private javax.jms.Session session; + boolean sessionTransacted = false; + private String replyName; + private String topicName; + private MessageProducer prod; + private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>(); + private Queue reply; + private Queue topic; + private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE; + // QMF Stuff + AMQConnection connection; + public String url; + public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>(); + private Session consoleSession; + private boolean connected = false; + private boolean syncInFlight = false; + private boolean topicBound = false; + private int reqsOutstanding = 0; + private Object lockObject = new Object(); + UUID brokerId = UUID.randomUUID(); + + public Broker(org.apache.qpid.console.Session session, String url) + { + log.debug("Creating a new Broker for url " + url); + this.url = url; + consoleSession = session; + this.tryToConnect(); + } + + public int brokerBank() + { + return 1; + } + + protected HeaderInfo CheckHeader(Decoder decoder) + { + HeaderInfo returnValue = new HeaderInfo(); + returnValue.opcode = 'x'; + returnValue.sequence = -1; + returnValue.valid = false; + if (decoder.hasRemaining()) + { + char character = (char) decoder.readUint8(); + if (character != 'A') + { + return returnValue; + } + character = (char) decoder.readUint8(); + if (character != 'M') + { + return returnValue; + } + character = (char) decoder.readUint8(); + if (character != '2') + { + return returnValue; + } + returnValue.valid = true; + returnValue.opcode = (char) decoder.readUint8(); + returnValue.sequence = decoder.readUint32(); + } + return returnValue; + } + + public Encoder createEncoder(char opcode, long sequence) + { + return setHeader(new BBEncoder(1024), opcode, sequence); + } + + public Message createMessage(Encoder enc) + { + try + { + byte[] buf = new byte[1024]; + BBEncoder bbenc = (BBEncoder) enc; + BytesMessage msg = session.createBytesMessage(); + ByteBuffer slice = bbenc.buffer(); + while (slice.hasRemaining()) + { + int n = Math.min(buf.length, slice.remaining()); + slice.get(buf, 0, n); + msg.writeBytes(buf, 0, n); + } + return msg; + } catch (JMSException e) + { + throw new ConsoleException(e); + } + } + + public void decrementOutstanding() + { + synchronized (lockObject) + { + this.reqsOutstanding -= 1; + if ((reqsOutstanding == 0) & !topicBound) + { + for (String key : consoleSession.bindingKeys()) + { + try + { + // this.clientSession.exchangeBind(topicName, + // "qpid.mannagement", key) ; + log.debug("Setting Topic Binding " + key); + // topicName = "management://qpid.management//" + key; + String rk = String.format("&routingkey='%s'", key); + Queue aQueue = session.createQueue(topicName + rk); + MessageConsumer cons = session.createConsumer(aQueue); + cons.setMessageListener(this); + consumers.add(cons); + } catch (JMSException e) + { + throw new ConsoleException(e); + } + } + topicBound = true; + } + if ((reqsOutstanding == 0) & syncInFlight) + { + syncInFlight = false; + lockObject.notifyAll(); + } + } + } + + private byte[] ensure(int capacity, byte[] body, int size) + { + if (capacity > body.length) + { + byte[] copy = new byte[capacity]; + System.arraycopy(body, 0, copy, 0, size); + body = copy; + } + return body; + } + + protected void finalize() + { + if (connected) + { + this.shutdown(); + } + } + + public boolean getSyncInFlight() + { + return syncInFlight; + } + + public void incrementOutstanding() + { + synchronized (lockObject) + { + this.reqsOutstanding += 1; + } + } + + public boolean isConnected() + { + return connected; + } + + public void onMessage(Message msg) + { + Decoder decoder = readBody(msg); + HeaderInfo headerInfo = this.CheckHeader(decoder); + // log.debug(headerInfo.toString()); + while (headerInfo.valid) + { + long seq = headerInfo.sequence; + switch (headerInfo.opcode) + { + case 'b': + consoleSession.handleBrokerResponse(this, decoder, seq); + break; + case 'p': + consoleSession.handlePackageIndicator(this, decoder, seq); + break; + case 'z': + consoleSession.handleCommandComplete(this, decoder, seq); + break; + case 'q': + consoleSession.handleClassIndicator(this, decoder, seq); + break; + case 'm': + consoleSession.handleMethodResponse(this, decoder, seq); + break; + case 'h': + consoleSession + .handleHeartbeatIndicator(this, decoder, seq, msg); + break; + case 'e': + consoleSession.handleEventIndicator(this, decoder, seq); + break; + case 's': + consoleSession.handleSchemaResponse(this, decoder, seq); + break; + case 'c': + consoleSession.handleContentIndicator(this, decoder, seq, true, + false); + break; + case 'i': + consoleSession.handleContentIndicator(this, decoder, seq, + false, true); + break; + case 'g': + consoleSession.handleContentIndicator(this, decoder, seq, true, + true); + break; + default: + log.error("Invalid message type recieved with opcode " + + headerInfo.opcode); + break; + } + headerInfo = this.CheckHeader(decoder); + } + } + + private Decoder readBody(Message message) + { + BytesMessage msg = (BytesMessage) message; + BBDecoder dec = new BBDecoder(); + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + int size = 0; + int n; + try + { + while ((n = msg.readBytes(buf)) > 0) + { + body = ensure(size + n, body, size); + System.arraycopy(buf, 0, body, size, n); + size += n; + } + } catch (JMSException e) + { + throw new ConsoleException(e); + } + dec.init(ByteBuffer.wrap(body, 0, size)); + return dec; + } + + public void send(Encoder enc) + { + this.send(this.createMessage(enc), "broker"); + } + + public void send(Message msg) + { + this.send(msg, "broker", -1); + } + + public void send(Message msg, String routingKey) + { + this.send(msg, routingKey, -1); + } + + public void send(Message msg, String routingKey, int ttl) + { + synchronized (lockObject) + { + try + { + log.debug(String.format("Sending message to routing key '%s'", + routingKey)); + String destName = String.format( + "management://qpid.management//?routingkey='%s'", + routingKey); + log.debug(destName); + Queue dest = session.createQueue(destName); + // Queue jmsReply = session + // createQueue("direct://amq.direct//?routingkey='reply-" + // + brokerId + "'"); + if (ttl != -1) + { + msg.setJMSExpiration(ttl); + } + msg.setJMSReplyTo(reply); + prod.send(dest, msg); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + } + + protected Encoder setHeader(Encoder enc, char opcode, long sequence) + { + enc.writeUint8((short) 'A'); + enc.writeUint8((short) 'M'); + enc.writeUint8((short) '2'); + enc.writeUint8((short) opcode); + enc.writeUint32(sequence); + return enc; + } + + public void setSyncInFlight(boolean inFlight) + { + synchronized (lockObject) + { + syncInFlight = inFlight; + lockObject.notifyAll(); + } + } + + public void shutdown() + { + if (connected) + { + this.waitForStable(); + try + { + session.close(); + for (MessageConsumer cons : consumers) + { + cons.close(); + } + connection.close(); + } catch (Exception e) + { + throw new ConsoleException(e); + } finally + { + this.connected = false; + } + } + } + + protected void tryToConnect() + { + try + { + reqsOutstanding = 1; + Agent newAgent = new Agent(this, 0, "BrokerAgent"); + Agents.put(newAgent.agentKey(), newAgent); + connection = new AMQConnection(url); + session = connection.createSession(sessionTransacted, + acknowledgeMode); + replyName = String + .format( + "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'", + brokerId); + topicName = String + .format( + "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'", + brokerId); + reply = session.createQueue(replyName); + MessageConsumer cons = session.createConsumer(reply); + cons.setMessageListener(this); + consumers.add(cons); + prod = session.createProducer(null); + topic = session.createQueue(topicName); + cons = session.createConsumer(topic); + cons.setMessageListener(this); + consumers.add(cons); + connection.start(); + // Rest of the topic is bound later. Start er up + } catch (Exception e) + { + throw new ConsoleException(e); + } + connected = true; + consoleSession.handleBrokerConnect(this); + Encoder Encoder = createEncoder('B', 0); + this.send(Encoder); + } + + public void updateAgent(QMFObject obj) + { + long agentBank = (Long) obj.getProperty("agentBank"); + long brokerBank = (Long) obj.getProperty("brokerBank"); + String key = Agent.AgentKey(agentBank, brokerBank); + if (obj.isDeleted()) + { + if (Agents.containsKey(key)) + { + Agent agent = Agents.get(key); + Agents.remove(key); + consoleSession.handleAgentRemoved(agent); + } + } else + { + if (!Agents.containsKey(key)) + { + Agent newAgent = new Agent(this, agentBank, (String) obj + .getProperty("label")); + Agents.put(key, newAgent); + consoleSession.handleNewAgent(newAgent); + } + } + } + + public void waitForStable() + { + synchronized (lockObject) + { + if (connected) + { + long start = System.currentTimeMillis(); + syncInFlight = true; + while (reqsOutstanding != 0) + { + log.debug("Waiting to recieve messages"); + try + { + lockObject.wait(SYNC_TIME); + } catch (Exception e) + { + throw new ConsoleException(e); + } + long duration = System.currentTimeMillis() - start; + if (duration > SYNC_TIME) + { + throw new ConsoleException( + "Timeout waiting for Broker to Sync"); + } + } + } + } + } + + public void waitForSync(int timeout) + { + synchronized (lockObject) + { + long start = System.currentTimeMillis(); + while (syncInFlight) + { + try + { + lockObject.wait(SYNC_TIME); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + long duration = System.currentTimeMillis() - start; + if (duration > timeout) + { + throw new ConsoleException("Timeout waiting for Broker to Sync"); + } + } + } +} |