diff options
Diffstat (limited to 'qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java')
-rw-r--r-- | qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java | 659 |
1 files changed, 659 insertions, 0 deletions
diff --git a/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java b/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java new file mode 100644 index 0000000000..80348c52d9 --- /dev/null +++ b/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java @@ -0,0 +1,659 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.agent; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.qpid.agent.binding.BindingContext; +import org.apache.qpid.agent.binding.BindingUtils; +import org.apache.qpid.agent.binding.ClassBinding; +import org.apache.qpid.agent.binding.BindingException; +import org.apache.qpid.agent.binding.MethodBinding; +import org.apache.qpid.agent.binding.ParameterBinding; +import org.apache.qpid.agent.binding.PropertyBinding; +import org.apache.qpid.agent.binding.TypeBinding; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Decoder; + +/** + * The main class for interacting with the QMF bus. Objects which are + * to be managed can be registered with the agent, as can classes + * to be exposed via the schema. + */ +public class Agent implements MessageListener +{ + // The following are settings to configure the Agent + private AMQConnection connection; + private boolean sessionTransacted = false; + private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; + private String label; + private UUID systemId; + // this list holds the objects until the agent is started + private List managedObjects = new ArrayList(); + private List registeredClasses = new ArrayList(); + // The following instance variables are not + // able to be set by the end user. + private Session session; + private MessageProducer prod; + private MessageConsumer cons; + private Queue reply; + private BindingContext bctx = new BindingContext(); + private Map<Long, ManagedObject> objects = new Hashtable<Long, ManagedObject>(); + private long bbank; + private long abank; + private static Log log = LogFactory.getLog(Agent.class); + private volatile boolean inside = false; + + public Agent() + { + systemId = UUID.randomUUID(); + log.debug(String.format("Agent with uid %s created", systemId + .toString())); + } + + public Agent(String label, UUID systemId) + { + this.systemId = systemId; + this.label = label; + log.debug(String.format("Agent with name %s and uid %s created", label, + systemId.toString())); + } + + + public void register(ManagedObject managedObject) + { + Class managedClass = managedObject.getObjectClass(); + long id = managedObject.getId(); + ClassBinding cb = bctx.register(managedClass); + managedObject.setManagedClassName(cb.getName()); + managedObject.setManagedPackageName(cb.getPackage()); + log.debug(String.format( + "Added managed object id '%d' for package '%s' class '%s'", id, + managedObject.getManagedPackageName(), managedObject + .getManagedClassName())); + objects.put(id, managedObject); + managedObjects.add(managedObject); + } + + public void registerClass(Class cls) + { + bctx.register(cls); + if (!registeredClasses.contains(cls)) + { + registeredClasses.add(cls); + } + } + + /** + * Starts up the agent. Many bean containers may call this by + * default which aids in deployment + */ + public void start() + { + log.debug(String.format("Agent with uid %s and name %s starting", + systemId.toString(), label)); + for (Object clsName : registeredClasses.toArray()) + { + try + { + Class cls = Class.forName(clsName.toString()); + this.registerClass(cls); + } catch (Exception e) + { + log.error("Could not register class " + clsName); + } + } + for (Object obj : managedObjects.toArray()) + { + this.register((ManagedObject) obj); + } + try + { + session = connection.createSession(sessionTransacted, + acknowledgeMode); + reply = session.createQueue("direct://amq.direct//" + label); + cons = session.createConsumer(reply); + cons.setMessageListener(this); + prod = session.createProducer(null); + } catch (JMSException e) + { + throw new AgentException(e); + } + attachRequest(label, systemId); + try + { + connection.start(); + } catch (JMSException e) + { + throw new AgentException(e); + } + } + + /** + * Send an event object to the bus + */ + public void raiseEvent(Object value, EventSeverity sev) + { + log.debug(String.format("Sending event of class %s with Severity %s", + value.getClass(), sev.ordinal())); + BBEncoder enc = this.init('e'); + ClassBinding cb = bctx.getClassBinding(value.getClass()); + String pkg = cb.getPackage(); + String cls = cb.getName(); + enc.writeStr8(pkg); + enc.writeStr8(cls); + enc.writeBin128(cb.getSchemaHash()); + long now = System.currentTimeMillis() * 1000000; + enc.writeInt64(now); + enc.writeUint8((short) sev.ordinal()); + for (PropertyBinding p : cb.getProperties()) + { + p.getType().encode(enc, BindingUtils.get(p, value)); + } + send( + String.format("console.event.%d.%d.%s.%s", bbank, abank, pkg, + cls), enc); + } + + public void onMessage(Message message) + { + if (inside) + { + new Throwable().printStackTrace(); + } + inside = true; + Decoder dec = readBody(message); + Destination replyTo; + try + { + replyTo = message.getJMSReplyTo(); + } catch (JMSException e) + { + throw new AgentException(e); + } + byte[] magic = dec.readBytes(3); + if (magic[0] != 'A' || magic[1] != 'M' || magic[2] != '3') + { + throw new AgentException("bad magic: " + new String(magic)); + } + short op = dec.readUint8(); + long seq = dec.readUint32(); + log.debug("Message recieved: " + (char) op); + switch (op) + { + case 'a': + this.handleAgentAttach(seq, replyTo, dec); + break; + case 'G': + this.handleGetQuery(seq, replyTo, dec); + break; + case 'M': + this.handleMethodRequest(seq, replyTo, dec); + break; + case 'S': + this.handleSchemaRequest(seq, replyTo, dec); + break; + case 'x': + // TODO + break; + default: + throw new IllegalArgumentException("opcode: " + ((char) op)); + } + inside = false; + } + + protected ClassBinding getClassBinding(ManagedObject mobj) + { + return bctx.getClassBinding(mobj.getObjectClass()); + } + + private byte[] ensure(int capacity, byte[] body, int size) + { + if (capacity > body.length) + { + byte[] copy = new byte[capacity]; + System.arraycopy(body, 0, copy, 0, size); + body = copy; + } + return body; + } + + private Decoder readBody(Message message) + { + BytesMessage msg = (BytesMessage) message; + BBDecoder dec = new BBDecoder(); + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + int size = 0; + int n; + try + { + while ((n = msg.readBytes(buf)) > 0) + { + body = ensure(size + n, body, size); + System.arraycopy(buf, 0, body, size, n); + size += n; + } + } catch (JMSException e) + { + throw new AgentException(e); + } + dec.init(ByteBuffer.wrap(body, 0, size)); + return dec; + } + + protected void handleAgentAttach(long seq, Destination replyTo, Decoder dec) + { + log.debug("Agent Attach Message"); + bbank = dec.readUint32(); + abank = dec.readUint32(); + try + { + MessageConsumer mc = session + .createConsumer(session + .createQueue(String + .format( + "management://qpid.management//%s?routingkey='agent.%d.%d'", + label, bbank, abank))); + mc.setMessageListener(this); + } catch (JMSException e) + { + throw new AgentException(e); + } + for (String packageName : bctx.getPackages()) + { + packageIndication(packageName); + } + for (ClassBinding cb : bctx.getAllBindings()) + { + classIndication(cb); + } + for (ManagedObject mo : objects.values()) + { + content('i', seq, null, mo); + } + } + + protected void handleMethodRequest(long seq, Destination replyTo, + Decoder dec) + { + dec.readUint64(); // first part of object-id + long id = dec.readUint64(); + ManagedObject mo = objects.get(id); + if (mo == null) + { + methodResponse(seq, replyTo, 1, String.format( + "no such object: 0x%x", id)); + } else + { + dec.readStr8(); // pkg + dec.readStr8(); // cls + dec.readBin128(); // hash + String mname = dec.readStr8(); + ClassBinding cls = getClassBinding(mo); + MethodBinding method = cls.getMethod(mname); + if (method == null) + { + methodResponse(seq, replyTo, 2, String.format( + "no such method: %s", mname)); + } else + { + log.trace("Handle method: " + method.getName()); + List<ParameterBinding> params = method.getInParameters(); + Object[] args = new Object[params.size()]; + int idx = 0; + for (ParameterBinding p : params) + { + TypeBinding typeBinding = p.getType(); + log + .trace(String + .format( + "Decoding parameter with type %s ref package %s ref class %s ", + typeBinding.getCode(), typeBinding + .getRefPackage(), + typeBinding.getRefClass())); + args[idx++] = typeBinding.decode(dec); + log.trace("Done"); + } + try + { + Object[] result = mo.invoke(method, args); + methodResponse(seq, replyTo, 0, null, method, result); + } catch (BindingException ex) + { + log + .error(String + .format( + "An exception occured invoking method %s. Stack trace sent to console.", + method.getName())); + StringWriter str = new StringWriter(); + PrintWriter writer = new PrintWriter(str); + ex.printStackTrace(writer); + writer.flush(); + methodResponse(seq, replyTo, 7, str.toString()); + } + log.trace("Done with method: " + method.getName()); + } + } + } + + protected void handleGetQuery(long seq, Destination replyTo, Decoder dec) + { + Map<String, Object> data = dec.readMap(); + if (data.containsKey("_objectid")) + { + long objId = (Long) data.get("_objectid"); + log.debug("Get Request message for object id " + objId); + ManagedObject mo = objects.get(objId); + if (mo == null) + { + methodResponse(seq, replyTo, 1, String.format( + "no such object: 0x%x", objId)); + } else + { + content('g', seq, replyTo, mo); + } + } else if (data.containsKey("_class")) + { + String className = (String) data.get("_class"); + String packageName = (String) data.get("_package"); + log.debug(String.format( + "Get Request message for package '%s' class '%s'", + packageName, className)); + for (ManagedObject mo : objects.values()) + { + if (mo.getManagedClassName().equals(className)) + { + if ((packageName == null) || packageName.equals("") + || packageName.equals(mo.getManagedPackageName())) + { + content('g', seq, replyTo, mo); + } + } + } + } else + { + for (ManagedObject mo : objects.values()) + { + content('g', seq, replyTo, mo); + } + } + complete(seq, replyTo); + } + + protected void handleSchemaRequest(long seq, Destination replyTo, + Decoder dec) + { + String pkg = dec.readStr8(); + String cls = dec.readStr8(); + log.debug(String.format( + "SchemaRequest message for package '%s' class '%s'", pkg, cls)); + ClassBinding cb = bctx.getClassBinding(pkg, cls); + if (cb == null) + { + throw new AgentException("no such class: " + pkg + ", " + cls); + } + schemaResponse(seq, cb); + } + + protected BBEncoder init(char opcode) + { + return init(opcode, 0); + } + + protected BBEncoder init(char opcode, long sequence) + { + BBEncoder enc = new BBEncoder(1024); + enc.init(); + enc.writeUint8((short) 'A'); + enc.writeUint8((short) 'M'); + enc.writeUint8((short) '3'); + enc.writeUint8((short) opcode); + enc.writeUint32(sequence); + return enc; + } + + protected void send(BBEncoder enc) + { + send("broker", enc); + } + + protected void send(Destination dest, BBEncoder enc) + { + try + { + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + BytesMessage msg = session.createBytesMessage(); + ByteBuffer slice = enc.segment(); + while (slice.hasRemaining()) + { + int n = Math.min(buf.length, slice.remaining()); + slice.get(buf, 0, n); + msg.writeBytes(buf, 0, n); + } + msg.setJMSReplyTo(reply); + // ???: I assume this is thread safe. + prod.send(dest, msg); + } catch (JMSException e) + { + throw new AgentException(e); + } + } + + protected void send(String routingKey, BBEncoder enc) + { + try + { + send(session + .createQueue("management://qpid.management//?routingkey='" + + routingKey + "'"), enc); + } catch (JMSException e) + { + throw new AgentException(e); + } + } + + private void attachRequest(String label, UUID systemId) + { + BBEncoder enc = init('A'); + enc.writeStr8(label); + enc.writeUuid(systemId); + enc.writeUint32(0); + enc.writeUint32(0); + send(enc); + } + + private void packageIndication(String pkg) + { + BBEncoder enc = init('p'); + enc.writeStr8(pkg); + send(enc); + } + + private void classIndication(ClassBinding cb) + { + BBEncoder enc = init('q'); + enc.writeUint8(cb.getKind()); + enc.writeStr8(cb.getPackage()); + enc.writeStr8(cb.getName()); + enc.writeBin128(cb.getSchemaHash()); // schema hash? + send(enc); + } + + private void schemaResponse(long seq, ClassBinding cb) + { + BBEncoder enc = init('s', seq); + cb.encode(enc); + send(enc); + } + + private void content(char c, long seq, Destination dest, ManagedObject mo) + { + BBEncoder enc = init(c, seq); + ClassBinding cb = getClassBinding(mo); + String pkg = cb.getPackage(); + String cls = cb.getName(); + enc.writeStr8(pkg); + enc.writeStr8(cls); + enc.writeBin128(cb.getSchemaHash()); + long now = System.currentTimeMillis() * 1000000; + enc.writeUint64(now); + enc.writeUint64(now); + enc.writeUint64(0); + enc.writeUint64(0x0000FFFFFFFFFFFFL & ((bbank << 28) | abank)); + enc.writeUint64(mo.getId()); + for (PropertyBinding p : cb.getProperties()) + { + p.getType().encode(enc, mo.get(p)); + } + if (dest == null) + { + send(String.format("console.obj.%d.%d.%s.%s", bbank, abank, pkg, + cls), enc); + } else + { + send(dest, enc); + } + } + + private void complete(long seq, Destination dest) + { + BBEncoder enc = init('z', seq); + enc.writeUint32(0); + enc.writeStr8(""); + send(dest, enc); + } + + private void methodResponse(long seq, Destination dest, int status, + String text) + { + methodResponse(seq, dest, status, text, null, null); + } + + private void methodResponse(long seq, Destination dest, int status, + String text, MethodBinding method, Object[] result) + { + BBEncoder enc = init('m', seq); + enc.writeUint32(status); + enc.writeStr16(text == null ? "" : text); + if (method != null) + { + int idx = 0; + for (ParameterBinding p : method.getOutParameters()) + { + p.getType().encode(enc, result[idx++]); + } + } + send(dest, enc); + } + + public String getLabel() + { + return label; + } + + public void setLabel(String label) + { + this.label = label; + } + + public AMQConnection getConnection() + { + return connection; + } + + public void setConnection(AMQConnection connection) + { + this.connection = connection; + } + + public boolean isSessionTransacted() + { + return sessionTransacted; + } + + public void setSessionTransacted(boolean sessionTransacted) + { + this.sessionTransacted = sessionTransacted; + } + + public void setManagedObjects(List objectList) + { + this.managedObjects = objectList; + } + + public List getManagedObjects() + { + return managedObjects; + } + + public void setRegisteredClasses(List objectList) + { + this.registeredClasses = objectList; + } + + public List getRegisteredClasses() + { + return this.registeredClasses; + } + + public static void main(String[] args) throws Exception + { + String broker = args[0]; + String name = args[1]; + String url = String.format( + "amqp://guest:guest@/?brokerlist='tcp://%s'", broker); + AMQConnection conn = new AMQConnection(url); + Agent agent = new Agent(name, UUID.randomUUID()); + agent.setConnection(conn); + for (int i = 2; i < args.length; i++) + { + Class<?> cls = Class.forName(args[i]); + agent.register(new ManagedPOJO(cls.newInstance())); + } + agent.start(); + while (true) + { + Thread.sleep(1000); + } + } +} |