summaryrefslogtreecommitdiff
path: root/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java')
-rw-r--r--qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java504
1 files changed, 0 insertions, 504 deletions
diff --git a/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java b/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
deleted file mode 100644
index 16c77449f1..0000000000
--- a/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.console;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.transport.codec.Decoder;
-import org.apache.qpid.transport.codec.Encoder;
-
-public class Broker implements MessageListener
-{
- class HeaderInfo
- {
- boolean valid;
- long sequence;
- char opcode;
-
- public String toString()
- {
- return String.format("%s Header with opcode %s and sequence %s",
- (valid ? "Valid" : "Invalid"), opcode, sequence);
- }
- }
-
- private static Logger log = LoggerFactory.getLogger(Broker.class);
- public static final int SYNC_TIME = 60000;
- // JMS Stuff
- private javax.jms.Session session;
- boolean sessionTransacted = false;
- private String replyName;
- private String topicName;
- private MessageProducer prod;
- private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
- private Queue reply;
- private Queue topic;
- private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE;
- // QMF Stuff
- AMQConnection connection;
- public String url;
- public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>();
- private Session consoleSession;
- private boolean connected = false;
- private boolean syncInFlight = false;
- private boolean topicBound = false;
- private int reqsOutstanding = 0;
- private Object lockObject = new Object();
- UUID brokerId = UUID.randomUUID();
-
- public Broker(org.apache.qpid.console.Session session, String url)
- {
- log.debug("Creating a new Broker for url " + url);
- this.url = url;
- consoleSession = session;
- this.tryToConnect();
- }
-
- public int brokerBank()
- {
- return 1;
- }
-
- protected HeaderInfo CheckHeader(Decoder decoder)
- {
- HeaderInfo returnValue = new HeaderInfo();
- returnValue.opcode = 'x';
- returnValue.sequence = -1;
- returnValue.valid = false;
- if (decoder.hasRemaining())
- {
- char character = (char) decoder.readUint8();
- if (character != 'A')
- {
- return returnValue;
- }
- character = (char) decoder.readUint8();
- if (character != 'M')
- {
- return returnValue;
- }
- character = (char) decoder.readUint8();
- if (character != '2')
- {
- return returnValue;
- }
- returnValue.valid = true;
- returnValue.opcode = (char) decoder.readUint8();
- returnValue.sequence = decoder.readUint32();
- }
- return returnValue;
- }
-
- public Encoder createEncoder(char opcode, long sequence)
- {
- return setHeader(new BBEncoder(1024), opcode, sequence);
- }
-
- public Message createMessage(Encoder enc)
- {
- try
- {
- byte[] buf = new byte[1024];
- BBEncoder bbenc = (BBEncoder) enc;
- BytesMessage msg = session.createBytesMessage();
- ByteBuffer slice = bbenc.buffer();
- while (slice.hasRemaining())
- {
- int n = Math.min(buf.length, slice.remaining());
- slice.get(buf, 0, n);
- msg.writeBytes(buf, 0, n);
- }
- return msg;
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- }
-
- public void decrementOutstanding()
- {
- synchronized (lockObject)
- {
- this.reqsOutstanding -= 1;
- if ((reqsOutstanding == 0) & !topicBound)
- {
- for (String key : consoleSession.bindingKeys())
- {
- try
- {
- // this.clientSession.exchangeBind(topicName,
- // "qpid.mannagement", key) ;
- log.debug("Setting Topic Binding " + key);
- // topicName = "management://qpid.management//" + key;
- String rk = String.format("&routingkey='%s'", key);
- Queue aQueue = session.createQueue(topicName + rk);
- MessageConsumer cons = session.createConsumer(aQueue);
- cons.setMessageListener(this);
- consumers.add(cons);
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- }
- topicBound = true;
- }
- if ((reqsOutstanding == 0) & syncInFlight)
- {
- syncInFlight = false;
- lockObject.notifyAll();
- }
- }
- }
-
- private byte[] ensure(int capacity, byte[] body, int size)
- {
- if (capacity > body.length)
- {
- byte[] copy = new byte[capacity];
- System.arraycopy(body, 0, copy, 0, size);
- body = copy;
- }
- return body;
- }
-
- protected void finalize()
- {
- if (connected)
- {
- this.shutdown();
- }
- }
-
- public boolean getSyncInFlight()
- {
- return syncInFlight;
- }
-
- public void incrementOutstanding()
- {
- synchronized (lockObject)
- {
- this.reqsOutstanding += 1;
- }
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public void onMessage(Message msg)
- {
- Decoder decoder = readBody(msg);
- HeaderInfo headerInfo = this.CheckHeader(decoder);
- // log.debug(headerInfo.toString());
- while (headerInfo.valid)
- {
- long seq = headerInfo.sequence;
- switch (headerInfo.opcode)
- {
- case 'b':
- consoleSession.handleBrokerResponse(this, decoder, seq);
- break;
- case 'p':
- consoleSession.handlePackageIndicator(this, decoder, seq);
- break;
- case 'z':
- consoleSession.handleCommandComplete(this, decoder, seq);
- break;
- case 'q':
- consoleSession.handleClassIndicator(this, decoder, seq);
- break;
- case 'm':
- consoleSession.handleMethodResponse(this, decoder, seq);
- break;
- case 'h':
- consoleSession
- .handleHeartbeatIndicator(this, decoder, seq, msg);
- break;
- case 'e':
- consoleSession.handleEventIndicator(this, decoder, seq);
- break;
- case 's':
- consoleSession.handleSchemaResponse(this, decoder, seq);
- break;
- case 'c':
- consoleSession.handleContentIndicator(this, decoder, seq, true,
- false);
- break;
- case 'i':
- consoleSession.handleContentIndicator(this, decoder, seq,
- false, true);
- break;
- case 'g':
- consoleSession.handleContentIndicator(this, decoder, seq, true,
- true);
- break;
- default:
- log.error("Invalid message type recieved with opcode "
- + headerInfo.opcode);
- break;
- }
- headerInfo = this.CheckHeader(decoder);
- }
- }
-
- private Decoder readBody(Message message)
- {
- BytesMessage msg = (BytesMessage) message;
- BBDecoder dec = new BBDecoder();
- byte[] buf = new byte[1024];
- byte[] body = new byte[1024];
- int size = 0;
- int n;
- try
- {
- while ((n = msg.readBytes(buf)) > 0)
- {
- body = ensure(size + n, body, size);
- System.arraycopy(buf, 0, body, size, n);
- size += n;
- }
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- dec.init(ByteBuffer.wrap(body, 0, size));
- return dec;
- }
-
- public void send(Encoder enc)
- {
- this.send(this.createMessage(enc), "broker");
- }
-
- public void send(Message msg)
- {
- this.send(msg, "broker", -1);
- }
-
- public void send(Message msg, String routingKey)
- {
- this.send(msg, routingKey, -1);
- }
-
- public void send(Message msg, String routingKey, int ttl)
- {
- synchronized (lockObject)
- {
- try
- {
- log.debug(String.format("Sending message to routing key '%s'",
- routingKey));
- String destName = String.format(
- "management://qpid.management//?routingkey='%s'",
- routingKey);
- log.debug(destName);
- Queue dest = session.createQueue(destName);
- // Queue jmsReply = session
- // createQueue("direct://amq.direct//?routingkey='reply-"
- // + brokerId + "'");
- if (ttl != -1)
- {
- msg.setJMSExpiration(ttl);
- }
- msg.setJMSReplyTo(reply);
- prod.send(dest, msg);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- }
- }
-
- protected Encoder setHeader(Encoder enc, char opcode, long sequence)
- {
- enc.writeUint8((short) 'A');
- enc.writeUint8((short) 'M');
- enc.writeUint8((short) '2');
- enc.writeUint8((short) opcode);
- enc.writeUint32(sequence);
- return enc;
- }
-
- public void setSyncInFlight(boolean inFlight)
- {
- synchronized (lockObject)
- {
- syncInFlight = inFlight;
- lockObject.notifyAll();
- }
- }
-
- public void shutdown()
- {
- if (connected)
- {
- this.waitForStable();
- try
- {
- session.close();
- for (MessageConsumer cons : consumers)
- {
- cons.close();
- }
- connection.close();
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- } finally
- {
- this.connected = false;
- }
- }
- }
-
- protected void tryToConnect()
- {
- try
- {
- reqsOutstanding = 1;
- Agent newAgent = new Agent(this, 0, "BrokerAgent");
- Agents.put(newAgent.agentKey(), newAgent);
- connection = new AMQConnection(url);
- session = connection.createSession(sessionTransacted,
- acknowledgeMode);
- replyName = String
- .format(
- "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'",
- brokerId);
- topicName = String
- .format(
- "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'",
- brokerId);
- reply = session.createQueue(replyName);
- MessageConsumer cons = session.createConsumer(reply);
- cons.setMessageListener(this);
- consumers.add(cons);
- prod = session.createProducer(null);
- topic = session.createQueue(topicName);
- cons = session.createConsumer(topic);
- cons.setMessageListener(this);
- consumers.add(cons);
- connection.start();
- // Rest of the topic is bound later. Start er up
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- connected = true;
- consoleSession.handleBrokerConnect(this);
- Encoder Encoder = createEncoder('B', 0);
- this.send(Encoder);
- }
-
- public void updateAgent(QMFObject obj)
- {
- long agentBank = (Long) obj.getProperty("agentBank");
- long brokerBank = (Long) obj.getProperty("brokerBank");
- String key = Agent.AgentKey(agentBank, brokerBank);
- if (obj.isDeleted())
- {
- if (Agents.containsKey(key))
- {
- Agent agent = Agents.get(key);
- Agents.remove(key);
- consoleSession.handleAgentRemoved(agent);
- }
- } else
- {
- if (!Agents.containsKey(key))
- {
- Agent newAgent = new Agent(this, agentBank, (String) obj
- .getProperty("label"));
- Agents.put(key, newAgent);
- consoleSession.handleNewAgent(newAgent);
- }
- }
- }
-
- public void waitForStable()
- {
- synchronized (lockObject)
- {
- if (connected)
- {
- long start = System.currentTimeMillis();
- syncInFlight = true;
- while (reqsOutstanding != 0)
- {
- log.debug("Waiting to recieve messages");
- try
- {
- lockObject.wait(SYNC_TIME);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- long duration = System.currentTimeMillis() - start;
- if (duration > SYNC_TIME)
- {
- throw new ConsoleException(
- "Timeout waiting for Broker to Sync");
- }
- }
- }
- }
- }
-
- public void waitForSync(int timeout)
- {
- synchronized (lockObject)
- {
- long start = System.currentTimeMillis();
- while (syncInFlight)
- {
- try
- {
- lockObject.wait(SYNC_TIME);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- }
- long duration = System.currentTimeMillis() - start;
- if (duration > timeout)
- {
- throw new ConsoleException("Timeout waiting for Broker to Sync");
- }
- }
- }
-}