summaryrefslogtreecommitdiff
path: root/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java')
-rw-r--r--qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java659
1 files changed, 659 insertions, 0 deletions
diff --git a/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java b/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java
new file mode 100644
index 0000000000..80348c52d9
--- /dev/null
+++ b/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java
@@ -0,0 +1,659 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.agent;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.agent.binding.BindingContext;
+import org.apache.qpid.agent.binding.BindingUtils;
+import org.apache.qpid.agent.binding.ClassBinding;
+import org.apache.qpid.agent.binding.BindingException;
+import org.apache.qpid.agent.binding.MethodBinding;
+import org.apache.qpid.agent.binding.ParameterBinding;
+import org.apache.qpid.agent.binding.PropertyBinding;
+import org.apache.qpid.agent.binding.TypeBinding;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Decoder;
+
+/**
+ * The main class for interacting with the QMF bus. Objects which are
+ * to be managed can be registered with the agent, as can classes
+ * to be exposed via the schema.
+ */
+public class Agent implements MessageListener
+{
+ // The following are settings to configure the Agent
+ private AMQConnection connection;
+ private boolean sessionTransacted = false;
+ private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ private String label;
+ private UUID systemId;
+ // this list holds the objects until the agent is started
+ private List managedObjects = new ArrayList();
+ private List registeredClasses = new ArrayList();
+ // The following instance variables are not
+ // able to be set by the end user.
+ private Session session;
+ private MessageProducer prod;
+ private MessageConsumer cons;
+ private Queue reply;
+ private BindingContext bctx = new BindingContext();
+ private Map<Long, ManagedObject> objects = new Hashtable<Long, ManagedObject>();
+ private long bbank;
+ private long abank;
+ private static Log log = LogFactory.getLog(Agent.class);
+ private volatile boolean inside = false;
+
+ public Agent()
+ {
+ systemId = UUID.randomUUID();
+ log.debug(String.format("Agent with uid %s created", systemId
+ .toString()));
+ }
+
+ public Agent(String label, UUID systemId)
+ {
+ this.systemId = systemId;
+ this.label = label;
+ log.debug(String.format("Agent with name %s and uid %s created", label,
+ systemId.toString()));
+ }
+
+
+ public void register(ManagedObject managedObject)
+ {
+ Class managedClass = managedObject.getObjectClass();
+ long id = managedObject.getId();
+ ClassBinding cb = bctx.register(managedClass);
+ managedObject.setManagedClassName(cb.getName());
+ managedObject.setManagedPackageName(cb.getPackage());
+ log.debug(String.format(
+ "Added managed object id '%d' for package '%s' class '%s'", id,
+ managedObject.getManagedPackageName(), managedObject
+ .getManagedClassName()));
+ objects.put(id, managedObject);
+ managedObjects.add(managedObject);
+ }
+
+ public void registerClass(Class cls)
+ {
+ bctx.register(cls);
+ if (!registeredClasses.contains(cls))
+ {
+ registeredClasses.add(cls);
+ }
+ }
+
+ /**
+ * Starts up the agent. Many bean containers may call this by
+ * default which aids in deployment
+ */
+ public void start()
+ {
+ log.debug(String.format("Agent with uid %s and name %s starting",
+ systemId.toString(), label));
+ for (Object clsName : registeredClasses.toArray())
+ {
+ try
+ {
+ Class cls = Class.forName(clsName.toString());
+ this.registerClass(cls);
+ } catch (Exception e)
+ {
+ log.error("Could not register class " + clsName);
+ }
+ }
+ for (Object obj : managedObjects.toArray())
+ {
+ this.register((ManagedObject) obj);
+ }
+ try
+ {
+ session = connection.createSession(sessionTransacted,
+ acknowledgeMode);
+ reply = session.createQueue("direct://amq.direct//" + label);
+ cons = session.createConsumer(reply);
+ cons.setMessageListener(this);
+ prod = session.createProducer(null);
+ } catch (JMSException e)
+ {
+ throw new AgentException(e);
+ }
+ attachRequest(label, systemId);
+ try
+ {
+ connection.start();
+ } catch (JMSException e)
+ {
+ throw new AgentException(e);
+ }
+ }
+
+ /**
+ * Send an event object to the bus
+ */
+ public void raiseEvent(Object value, EventSeverity sev)
+ {
+ log.debug(String.format("Sending event of class %s with Severity %s",
+ value.getClass(), sev.ordinal()));
+ BBEncoder enc = this.init('e');
+ ClassBinding cb = bctx.getClassBinding(value.getClass());
+ String pkg = cb.getPackage();
+ String cls = cb.getName();
+ enc.writeStr8(pkg);
+ enc.writeStr8(cls);
+ enc.writeBin128(cb.getSchemaHash());
+ long now = System.currentTimeMillis() * 1000000;
+ enc.writeInt64(now);
+ enc.writeUint8((short) sev.ordinal());
+ for (PropertyBinding p : cb.getProperties())
+ {
+ p.getType().encode(enc, BindingUtils.get(p, value));
+ }
+ send(
+ String.format("console.event.%d.%d.%s.%s", bbank, abank, pkg,
+ cls), enc);
+ }
+
+ public void onMessage(Message message)
+ {
+ if (inside)
+ {
+ new Throwable().printStackTrace();
+ }
+ inside = true;
+ Decoder dec = readBody(message);
+ Destination replyTo;
+ try
+ {
+ replyTo = message.getJMSReplyTo();
+ } catch (JMSException e)
+ {
+ throw new AgentException(e);
+ }
+ byte[] magic = dec.readBytes(3);
+ if (magic[0] != 'A' || magic[1] != 'M' || magic[2] != '3')
+ {
+ throw new AgentException("bad magic: " + new String(magic));
+ }
+ short op = dec.readUint8();
+ long seq = dec.readUint32();
+ log.debug("Message recieved: " + (char) op);
+ switch (op)
+ {
+ case 'a':
+ this.handleAgentAttach(seq, replyTo, dec);
+ break;
+ case 'G':
+ this.handleGetQuery(seq, replyTo, dec);
+ break;
+ case 'M':
+ this.handleMethodRequest(seq, replyTo, dec);
+ break;
+ case 'S':
+ this.handleSchemaRequest(seq, replyTo, dec);
+ break;
+ case 'x':
+ // TODO
+ break;
+ default:
+ throw new IllegalArgumentException("opcode: " + ((char) op));
+ }
+ inside = false;
+ }
+
+ protected ClassBinding getClassBinding(ManagedObject mobj)
+ {
+ return bctx.getClassBinding(mobj.getObjectClass());
+ }
+
+ private byte[] ensure(int capacity, byte[] body, int size)
+ {
+ if (capacity > body.length)
+ {
+ byte[] copy = new byte[capacity];
+ System.arraycopy(body, 0, copy, 0, size);
+ body = copy;
+ }
+ return body;
+ }
+
+ private Decoder readBody(Message message)
+ {
+ BytesMessage msg = (BytesMessage) message;
+ BBDecoder dec = new BBDecoder();
+ byte[] buf = new byte[1024];
+ byte[] body = new byte[1024];
+ int size = 0;
+ int n;
+ try
+ {
+ while ((n = msg.readBytes(buf)) > 0)
+ {
+ body = ensure(size + n, body, size);
+ System.arraycopy(buf, 0, body, size, n);
+ size += n;
+ }
+ } catch (JMSException e)
+ {
+ throw new AgentException(e);
+ }
+ dec.init(ByteBuffer.wrap(body, 0, size));
+ return dec;
+ }
+
+ protected void handleAgentAttach(long seq, Destination replyTo, Decoder dec)
+ {
+ log.debug("Agent Attach Message");
+ bbank = dec.readUint32();
+ abank = dec.readUint32();
+ try
+ {
+ MessageConsumer mc = session
+ .createConsumer(session
+ .createQueue(String
+ .format(
+ "management://qpid.management//%s?routingkey='agent.%d.%d'",
+ label, bbank, abank)));
+ mc.setMessageListener(this);
+ } catch (JMSException e)
+ {
+ throw new AgentException(e);
+ }
+ for (String packageName : bctx.getPackages())
+ {
+ packageIndication(packageName);
+ }
+ for (ClassBinding cb : bctx.getAllBindings())
+ {
+ classIndication(cb);
+ }
+ for (ManagedObject mo : objects.values())
+ {
+ content('i', seq, null, mo);
+ }
+ }
+
+ protected void handleMethodRequest(long seq, Destination replyTo,
+ Decoder dec)
+ {
+ dec.readUint64(); // first part of object-id
+ long id = dec.readUint64();
+ ManagedObject mo = objects.get(id);
+ if (mo == null)
+ {
+ methodResponse(seq, replyTo, 1, String.format(
+ "no such object: 0x%x", id));
+ } else
+ {
+ dec.readStr8(); // pkg
+ dec.readStr8(); // cls
+ dec.readBin128(); // hash
+ String mname = dec.readStr8();
+ ClassBinding cls = getClassBinding(mo);
+ MethodBinding method = cls.getMethod(mname);
+ if (method == null)
+ {
+ methodResponse(seq, replyTo, 2, String.format(
+ "no such method: %s", mname));
+ } else
+ {
+ log.trace("Handle method: " + method.getName());
+ List<ParameterBinding> params = method.getInParameters();
+ Object[] args = new Object[params.size()];
+ int idx = 0;
+ for (ParameterBinding p : params)
+ {
+ TypeBinding typeBinding = p.getType();
+ log
+ .trace(String
+ .format(
+ "Decoding parameter with type %s ref package %s ref class %s ",
+ typeBinding.getCode(), typeBinding
+ .getRefPackage(),
+ typeBinding.getRefClass()));
+ args[idx++] = typeBinding.decode(dec);
+ log.trace("Done");
+ }
+ try
+ {
+ Object[] result = mo.invoke(method, args);
+ methodResponse(seq, replyTo, 0, null, method, result);
+ } catch (BindingException ex)
+ {
+ log
+ .error(String
+ .format(
+ "An exception occured invoking method %s. Stack trace sent to console.",
+ method.getName()));
+ StringWriter str = new StringWriter();
+ PrintWriter writer = new PrintWriter(str);
+ ex.printStackTrace(writer);
+ writer.flush();
+ methodResponse(seq, replyTo, 7, str.toString());
+ }
+ log.trace("Done with method: " + method.getName());
+ }
+ }
+ }
+
+ protected void handleGetQuery(long seq, Destination replyTo, Decoder dec)
+ {
+ Map<String, Object> data = dec.readMap();
+ if (data.containsKey("_objectid"))
+ {
+ long objId = (Long) data.get("_objectid");
+ log.debug("Get Request message for object id " + objId);
+ ManagedObject mo = objects.get(objId);
+ if (mo == null)
+ {
+ methodResponse(seq, replyTo, 1, String.format(
+ "no such object: 0x%x", objId));
+ } else
+ {
+ content('g', seq, replyTo, mo);
+ }
+ } else if (data.containsKey("_class"))
+ {
+ String className = (String) data.get("_class");
+ String packageName = (String) data.get("_package");
+ log.debug(String.format(
+ "Get Request message for package '%s' class '%s'",
+ packageName, className));
+ for (ManagedObject mo : objects.values())
+ {
+ if (mo.getManagedClassName().equals(className))
+ {
+ if ((packageName == null) || packageName.equals("")
+ || packageName.equals(mo.getManagedPackageName()))
+ {
+ content('g', seq, replyTo, mo);
+ }
+ }
+ }
+ } else
+ {
+ for (ManagedObject mo : objects.values())
+ {
+ content('g', seq, replyTo, mo);
+ }
+ }
+ complete(seq, replyTo);
+ }
+
+ protected void handleSchemaRequest(long seq, Destination replyTo,
+ Decoder dec)
+ {
+ String pkg = dec.readStr8();
+ String cls = dec.readStr8();
+ log.debug(String.format(
+ "SchemaRequest message for package '%s' class '%s'", pkg, cls));
+ ClassBinding cb = bctx.getClassBinding(pkg, cls);
+ if (cb == null)
+ {
+ throw new AgentException("no such class: " + pkg + ", " + cls);
+ }
+ schemaResponse(seq, cb);
+ }
+
+ protected BBEncoder init(char opcode)
+ {
+ return init(opcode, 0);
+ }
+
+ protected BBEncoder init(char opcode, long sequence)
+ {
+ BBEncoder enc = new BBEncoder(1024);
+ enc.init();
+ enc.writeUint8((short) 'A');
+ enc.writeUint8((short) 'M');
+ enc.writeUint8((short) '3');
+ enc.writeUint8((short) opcode);
+ enc.writeUint32(sequence);
+ return enc;
+ }
+
+ protected void send(BBEncoder enc)
+ {
+ send("broker", enc);
+ }
+
+ protected void send(Destination dest, BBEncoder enc)
+ {
+ try
+ {
+ byte[] buf = new byte[1024];
+ byte[] body = new byte[1024];
+ BytesMessage msg = session.createBytesMessage();
+ ByteBuffer slice = enc.segment();
+ while (slice.hasRemaining())
+ {
+ int n = Math.min(buf.length, slice.remaining());
+ slice.get(buf, 0, n);
+ msg.writeBytes(buf, 0, n);
+ }
+ msg.setJMSReplyTo(reply);
+ // ???: I assume this is thread safe.
+ prod.send(dest, msg);
+ } catch (JMSException e)
+ {
+ throw new AgentException(e);
+ }
+ }
+
+ protected void send(String routingKey, BBEncoder enc)
+ {
+ try
+ {
+ send(session
+ .createQueue("management://qpid.management//?routingkey='"
+ + routingKey + "'"), enc);
+ } catch (JMSException e)
+ {
+ throw new AgentException(e);
+ }
+ }
+
+ private void attachRequest(String label, UUID systemId)
+ {
+ BBEncoder enc = init('A');
+ enc.writeStr8(label);
+ enc.writeUuid(systemId);
+ enc.writeUint32(0);
+ enc.writeUint32(0);
+ send(enc);
+ }
+
+ private void packageIndication(String pkg)
+ {
+ BBEncoder enc = init('p');
+ enc.writeStr8(pkg);
+ send(enc);
+ }
+
+ private void classIndication(ClassBinding cb)
+ {
+ BBEncoder enc = init('q');
+ enc.writeUint8(cb.getKind());
+ enc.writeStr8(cb.getPackage());
+ enc.writeStr8(cb.getName());
+ enc.writeBin128(cb.getSchemaHash()); // schema hash?
+ send(enc);
+ }
+
+ private void schemaResponse(long seq, ClassBinding cb)
+ {
+ BBEncoder enc = init('s', seq);
+ cb.encode(enc);
+ send(enc);
+ }
+
+ private void content(char c, long seq, Destination dest, ManagedObject mo)
+ {
+ BBEncoder enc = init(c, seq);
+ ClassBinding cb = getClassBinding(mo);
+ String pkg = cb.getPackage();
+ String cls = cb.getName();
+ enc.writeStr8(pkg);
+ enc.writeStr8(cls);
+ enc.writeBin128(cb.getSchemaHash());
+ long now = System.currentTimeMillis() * 1000000;
+ enc.writeUint64(now);
+ enc.writeUint64(now);
+ enc.writeUint64(0);
+ enc.writeUint64(0x0000FFFFFFFFFFFFL & ((bbank << 28) | abank));
+ enc.writeUint64(mo.getId());
+ for (PropertyBinding p : cb.getProperties())
+ {
+ p.getType().encode(enc, mo.get(p));
+ }
+ if (dest == null)
+ {
+ send(String.format("console.obj.%d.%d.%s.%s", bbank, abank, pkg,
+ cls), enc);
+ } else
+ {
+ send(dest, enc);
+ }
+ }
+
+ private void complete(long seq, Destination dest)
+ {
+ BBEncoder enc = init('z', seq);
+ enc.writeUint32(0);
+ enc.writeStr8("");
+ send(dest, enc);
+ }
+
+ private void methodResponse(long seq, Destination dest, int status,
+ String text)
+ {
+ methodResponse(seq, dest, status, text, null, null);
+ }
+
+ private void methodResponse(long seq, Destination dest, int status,
+ String text, MethodBinding method, Object[] result)
+ {
+ BBEncoder enc = init('m', seq);
+ enc.writeUint32(status);
+ enc.writeStr16(text == null ? "" : text);
+ if (method != null)
+ {
+ int idx = 0;
+ for (ParameterBinding p : method.getOutParameters())
+ {
+ p.getType().encode(enc, result[idx++]);
+ }
+ }
+ send(dest, enc);
+ }
+
+ public String getLabel()
+ {
+ return label;
+ }
+
+ public void setLabel(String label)
+ {
+ this.label = label;
+ }
+
+ public AMQConnection getConnection()
+ {
+ return connection;
+ }
+
+ public void setConnection(AMQConnection connection)
+ {
+ this.connection = connection;
+ }
+
+ public boolean isSessionTransacted()
+ {
+ return sessionTransacted;
+ }
+
+ public void setSessionTransacted(boolean sessionTransacted)
+ {
+ this.sessionTransacted = sessionTransacted;
+ }
+
+ public void setManagedObjects(List objectList)
+ {
+ this.managedObjects = objectList;
+ }
+
+ public List getManagedObjects()
+ {
+ return managedObjects;
+ }
+
+ public void setRegisteredClasses(List objectList)
+ {
+ this.registeredClasses = objectList;
+ }
+
+ public List getRegisteredClasses()
+ {
+ return this.registeredClasses;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String broker = args[0];
+ String name = args[1];
+ String url = String.format(
+ "amqp://guest:guest@/?brokerlist='tcp://%s'", broker);
+ AMQConnection conn = new AMQConnection(url);
+ Agent agent = new Agent(name, UUID.randomUUID());
+ agent.setConnection(conn);
+ for (int i = 2; i < args.length; i++)
+ {
+ Class<?> cls = Class.forName(args[i]);
+ agent.register(new ManagedPOJO(cls.newInstance()));
+ }
+ agent.start();
+ while (true)
+ {
+ Thread.sleep(1000);
+ }
+ }
+}