summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java')
-rw-r--r--trunk/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java707
1 files changed, 0 insertions, 707 deletions
diff --git a/trunk/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java b/trunk/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java
deleted file mode 100644
index f6ae6adbc3..0000000000
--- a/trunk/qpid/java/management/agent/src/main/java/org/apache/qpid/agent/Agent.java
+++ /dev/null
@@ -1,707 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.agent;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.qpid.agent.binding.BindingContext;
-import org.apache.qpid.agent.binding.BindingUtils;
-import org.apache.qpid.agent.binding.ClassBinding;
-import org.apache.qpid.agent.binding.BindingException;
-import org.apache.qpid.agent.binding.MethodBinding;
-import org.apache.qpid.agent.binding.ParameterBinding;
-import org.apache.qpid.agent.binding.PropertyBinding;
-import org.apache.qpid.agent.binding.TypeBinding;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.transport.codec.Decoder;
-
-/**
- * The main class for interacting with the QMF bus. Objects which are to be
- * managed can be registered with the agent, as can classes to be exposed via
- * the schema.
- */
-public class Agent implements MessageListener
-{
- // The following are settings to configure the Agent
- protected AMQConnection connection;
- protected boolean sessionTransacted = false;
- protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
- protected String label;
- protected UUID systemId;
- // this list holds the objects until the agent is started
- protected List managedObjects = new ArrayList();
- protected List registeredClasses = new ArrayList();
- // The following instance variables are not
- // able to be set by the end user.
- protected Session session;
- protected MessageProducer prod;
- protected MessageConsumer cons;
- protected Queue reply;
- protected BindingContext bctx = new BindingContext();
- protected Map<Long, ManagedObject> objects = new Hashtable<Long, ManagedObject>();
- protected long bbank;
- protected long abank;
- protected static Log log = LogFactory.getLog(Agent.class);
- protected volatile boolean inside = false;
- protected ClassLoader classLoader = null;
-
- public Agent()
- {
- systemId = UUID.randomUUID();
- log.debug(String.format("Agent with uid %s created", systemId
- .toString()));
- }
-
- public Agent(String label, UUID systemId)
- {
- this.systemId = systemId;
- this.label = label;
- log.debug(String.format("Agent with name %s and uid %s created", label,
- systemId.toString()));
- }
-
- public void register(ManagedObject managedObject)
- {
- Class managedClass = managedObject.getObjectClass();
- long id = managedObject.getId();
- ClassBinding cb = bctx.register(managedClass);
- managedObject.setManagedClassName(cb.getName());
- managedObject.setManagedPackageName(cb.getPackage());
- log.debug(String.format(
- "Added managed object id '%d' for package '%s' class '%s'", id,
- managedObject.getManagedPackageName(), managedObject
- .getManagedClassName()));
- objects.put(id, managedObject);
- managedObjects.add(managedObject);
- }
-
- public void registerClass(Class cls)
- {
- bctx.register(cls);
- if (!registeredClasses.contains(cls))
- {
- registeredClasses.add(cls);
- }
- }
-
- /**
- * Stops the agents connection to the bus
- */
- public void stop()
- {
- try
- {
- cons.close();
- prod.close();
- connection.stop();
- connection.close();
- session.close();
- } catch (JMSException e)
- {
- log.error("Exception:", e);
- }
- }
-
- /**
- * Starts up the agent. Many bean containers may call this by default which
- * aids in deployment
- */
- public void start()
- {
- log.debug(String.format("Agent with uid %s and name %s starting",
- systemId.toString(), label));
- for (Object clsName : registeredClasses.toArray())
- {
- try
- {
- Class cls = null;
- if (String.class.isAssignableFrom(clsName.getClass()))
- {
- cls = getClass(clsName.toString());
- } else
- {
- cls = (Class) clsName;
- }
- this.registerClass(cls);
- } catch (Exception e)
- {
- log.error("Could not register class " + clsName);
- }
- }
- for (Object obj : managedObjects.toArray())
- {
- this.register((ManagedObject) obj);
- }
- try
- {
- session = connection.createSession(sessionTransacted,
- acknowledgeMode);
- reply = session
- .createQueue(String
- .format(
- "direct://amq.direct//%s-%s?exclusive='True'&autodelete='True'",
- label, systemId));
- cons = session.createConsumer(reply);
- cons.setMessageListener(this);
- prod = session.createProducer(null);
- } catch (JMSException e)
- {
- throw new AgentException(e);
- }
- attachRequest(label, systemId);
- try
- {
- connection.start();
- } catch (JMSException e)
- {
- throw new AgentException(e);
- }
- }
-
- /**
- * Send an event object to the bus
- */
- public void raiseEvent(Object value, EventSeverity sev)
- {
- log.debug(String.format("Sending event of class %s with Severity %s",
- value.getClass(), sev.ordinal()));
- BBEncoder enc = this.init('e');
- ClassBinding cb = bctx.getClassBinding(value.getClass());
- String pkg = cb.getPackage();
- String cls = cb.getName();
- enc.writeStr8(pkg);
- enc.writeStr8(cls);
- enc.writeBin128(cb.getSchemaHash());
- long now = System.currentTimeMillis() * 1000000;
- enc.writeInt64(now);
- enc.writeUint8((short) sev.ordinal());
- for (PropertyBinding p : cb.getProperties())
- {
- p.getType().encode(enc, BindingUtils.get(p, value));
- }
- send(
- String.format("console.event.%d.%d.%s.%s", bbank, abank, pkg,
- cls), enc);
- }
-
- public void onMessage(Message message)
- {
- if (inside)
- {
- new Throwable().printStackTrace();
- }
- inside = true;
- Decoder dec = readBody(message);
- Destination replyTo;
- try
- {
- replyTo = message.getJMSReplyTo();
- } catch (JMSException e)
- {
- throw new AgentException(e);
- }
- byte[] magic = dec.readBytes(3);
- if (magic[0] != 'A' || magic[1] != 'M' || magic[2] != '2')
- {
- throw new AgentException("bad magic: " + new String(magic));
- }
- short op = dec.readUint8();
- long seq = dec.readUint32();
- log.debug("Message recieved: " + (char) op);
- switch (op)
- {
- case 'a':
- this.handleAgentAttach(seq, replyTo, dec);
- break;
- case 'G':
- this.handleGetQuery(seq, replyTo, dec);
- break;
- case 'M':
- this.handleMethodRequest(seq, replyTo, dec);
- break;
- case 'S':
- this.handleSchemaRequest(seq, replyTo, dec);
- break;
- case 'x':
- // TODO
- break;
- default:
- throw new IllegalArgumentException("opcode: " + ((char) op));
- }
- inside = false;
- }
-
- protected ClassBinding getClassBinding(ManagedObject mobj)
- {
- return bctx.getClassBinding(mobj.getObjectClass());
- }
-
- private byte[] ensure(int capacity, byte[] body, int size)
- {
- if (capacity > body.length)
- {
- byte[] copy = new byte[capacity];
- System.arraycopy(body, 0, copy, 0, size);
- body = copy;
- }
- return body;
- }
-
- private Decoder readBody(Message message)
- {
- BytesMessage msg = (BytesMessage) message;
- BBDecoder dec = new BBDecoder();
- byte[] buf = new byte[1024];
- byte[] body = new byte[1024];
- int size = 0;
- int n;
- try
- {
- while ((n = msg.readBytes(buf)) > 0)
- {
- body = ensure(size + n, body, size);
- System.arraycopy(buf, 0, body, size, n);
- size += n;
- }
- } catch (JMSException e)
- {
- throw new AgentException(e);
- }
- dec.init(ByteBuffer.wrap(body, 0, size));
- return dec;
- }
-
- protected void handleAgentAttach(long seq, Destination replyTo, Decoder dec)
- {
- log.debug("Agent Attach Message");
- bbank = dec.readUint32();
- abank = dec.readUint32();
- try
- {
- MessageConsumer mc = session
- .createConsumer(session
- .createQueue(String
- .format(
- "management://qpid.management//%s-%s?routingkey='agent.%d.%d'&exclusive='True'&autodelete='True'",
- label, systemId, bbank, abank)));
- mc.setMessageListener(this);
- } catch (JMSException e)
- {
- throw new AgentException(e);
- }
- for (String packageName : bctx.getPackages())
- {
- packageIndication(packageName);
- }
- for (ClassBinding cb : bctx.getAllBindings())
- {
- classIndication(cb);
- }
- for (ManagedObject mo : objects.values())
- {
- content('i', seq, null, mo);
- }
- }
-
- protected void handleMethodRequest(long seq, Destination replyTo,
- Decoder dec)
- {
- dec.readUint64(); // first part of object-id
- long id = dec.readUint64();
- ManagedObject mo = objects.get(id);
- if (mo == null)
- {
- methodResponse(seq, replyTo, 1, String.format(
- "no such object: 0x%x", id));
- } else
- {
- dec.readStr8(); // pkg
- dec.readStr8(); // cls
- dec.readBin128(); // hash
- String mname = dec.readStr8();
- ClassBinding cls = getClassBinding(mo);
- MethodBinding method = cls.getMethod(mname);
- if (method == null)
- {
- methodResponse(seq, replyTo, 2, String.format(
- "no such method: %s", mname));
- } else
- {
- log.trace("Handle method: " + method.getName());
- List<ParameterBinding> params = method.getInParameters();
- Object[] args = new Object[params.size()];
- int idx = 0;
- for (ParameterBinding p : params)
- {
- TypeBinding typeBinding = p.getType();
- log
- .trace(String
- .format(
- "Decoding parameter with type %s ref package %s ref class %s ",
- typeBinding.getCode(), typeBinding
- .getRefPackage(),
- typeBinding.getRefClass()));
- args[idx++] = typeBinding.decode(dec);
- log.trace("Done");
- }
- try
- {
- Object[] result = mo.invoke(method, args);
- methodResponse(seq, replyTo, 0, null, method, result);
- } catch (BindingException ex)
- {
- log
- .error(
- String
- .format(
- "An exception occured invoking method %s. Stack trace sent to console.",
- method.getName()), ex);
- StringWriter str = new StringWriter();
- PrintWriter writer = new PrintWriter(str);
- ex.printStackTrace(writer);
- writer.flush();
- methodResponse(seq, replyTo, 7, str.toString());
- }
- log.trace("Done with method: " + method.getName());
- }
- }
- }
-
- protected void handleGetQuery(long seq, Destination replyTo, Decoder dec)
- {
- Map<String, Object> data = dec.readMap();
- if (data.containsKey("_objectid"))
- {
- long objId = (Long) data.get("_objectid");
- log.debug("Get Request message for object id " + objId);
- ManagedObject mo = objects.get(objId);
- if (mo == null)
- {
- methodResponse(seq, replyTo, 1, String.format(
- "no such object: 0x%x", objId));
- } else
- {
- content('g', seq, replyTo, mo);
- }
- } else if (data.containsKey("_class"))
- {
- String className = (String) data.get("_class");
- String packageName = (String) data.get("_package");
- log.debug(String.format(
- "Get Request message for package '%s' class '%s'",
- packageName, className));
- for (ManagedObject mo : objects.values())
- {
- if (mo.getManagedClassName().equals(className))
- {
- if ((packageName == null) || packageName.equals("")
- || packageName.equals(mo.getManagedPackageName()))
- {
- content('g', seq, replyTo, mo);
- }
- }
- }
- } else
- {
- for (ManagedObject mo : objects.values())
- {
- content('g', seq, replyTo, mo);
- }
- }
- complete(seq, replyTo);
- }
-
- protected void handleSchemaRequest(long seq, Destination replyTo,
- Decoder dec)
- {
- String pkg = dec.readStr8();
- String cls = dec.readStr8();
- log.debug(String.format(
- "SchemaRequest message for package '%s' class '%s'", pkg, cls));
- ClassBinding cb = bctx.getClassBinding(pkg, cls);
- if (cb == null)
- {
- throw new AgentException("no such class: " + pkg + ", " + cls);
- }
- schemaResponse(seq, cb);
- }
-
- protected BBEncoder init(char opcode)
- {
- return init(opcode, 0);
- }
-
- protected BBEncoder init(char opcode, long sequence)
- {
- BBEncoder enc = new BBEncoder(1024);
- enc.init();
- enc.writeUint8((short) 'A');
- enc.writeUint8((short) 'M');
- enc.writeUint8((short) '2');
- enc.writeUint8((short) opcode);
- enc.writeUint32(sequence);
- return enc;
- }
-
- protected void send(BBEncoder enc)
- {
- send("broker", enc);
- }
-
- protected void send(Destination dest, BBEncoder enc)
- {
- try
- {
- byte[] buf = new byte[1024];
- byte[] body = new byte[1024];
- BytesMessage msg = session.createBytesMessage();
- ByteBuffer slice = enc.segment();
- while (slice.hasRemaining())
- {
- int n = Math.min(buf.length, slice.remaining());
- slice.get(buf, 0, n);
- msg.writeBytes(buf, 0, n);
- }
- msg.setJMSReplyTo(reply);
- // ???: I assume this is thread safe.
- prod.send(dest, msg);
- } catch (JMSException e)
- {
- throw new AgentException(e);
- }
- }
-
- protected void send(String routingKey, BBEncoder enc)
- {
- try
- {
- send(session
- .createQueue("management://qpid.management//?routingkey='"
- + routingKey + "'"), enc);
- } catch (JMSException e)
- {
- throw new AgentException(e);
- }
- }
-
- protected void attachRequest(String label, UUID systemId)
- {
- BBEncoder enc = init('A');
- enc.writeStr8(label);
- enc.writeUuid(systemId);
- enc.writeUint32(0);
- enc.writeUint32(0);
- send(enc);
- }
-
- protected void packageIndication(String pkg)
- {
- BBEncoder enc = init('p');
- enc.writeStr8(pkg);
- send(enc);
- }
-
- protected void classIndication(ClassBinding cb)
- {
- BBEncoder enc = init('q');
- enc.writeUint8(cb.getKind());
- enc.writeStr8(cb.getPackage());
- enc.writeStr8(cb.getName());
- enc.writeBin128(cb.getSchemaHash()); // schema hash?
- send(enc);
- }
-
- protected void schemaResponse(long seq, ClassBinding cb)
- {
- BBEncoder enc = init('s', seq);
- cb.encode(enc);
- send(enc);
- }
-
- protected void content(char c, long seq, Destination dest, ManagedObject mo)
- {
- BBEncoder enc = init(c, seq);
- ClassBinding cb = getClassBinding(mo);
- String pkg = cb.getPackage();
- String cls = cb.getName();
- enc.writeStr8(pkg);
- enc.writeStr8(cls);
- enc.writeBin128(cb.getSchemaHash());
- long now = System.currentTimeMillis() * 1000000;
- enc.writeUint64(now);
- enc.writeUint64(now);
- enc.writeUint64(0);
- enc.writeUint64(0x0000FFFFFFFFFFFFL & ((bbank << 28) | abank));
- enc.writeUint64(mo.getId());
- for (PropertyBinding p : cb.getProperties())
- {
- p.getType().encode(enc, mo.get(p));
- }
- if (dest == null)
- {
- send(String.format("console.obj.%d.%d.%s.%s", bbank, abank, pkg,
- cls), enc);
- } else
- {
- send(dest, enc);
- }
- }
-
- protected void complete(long seq, Destination dest)
- {
- BBEncoder enc = init('z', seq);
- enc.writeUint32(0);
- enc.writeStr8("");
- send(dest, enc);
- }
-
- protected void methodResponse(long seq, Destination dest, int status,
- String text)
- {
- methodResponse(seq, dest, status, text, null, null);
- }
-
- protected void methodResponse(long seq, Destination dest, int status,
- String text, MethodBinding method, Object[] result)
- {
- BBEncoder enc = init('m', seq);
- enc.writeUint32(status);
- enc.writeStr16(text == null ? "" : text);
- if (method != null)
- {
- int idx = 0;
- for (ParameterBinding p : method.getOutParameters())
- {
- p.getType().encode(enc, result[idx++]);
- }
- }
- send(dest, enc);
- }
-
- protected Class getClass(String className)
- {
- try
- {
- if (classLoader != null)
- {
- return classLoader.loadClass(className);
- } else
- {
- return Class.forName(className);
- }
- } catch (ClassNotFoundException e)
- {
- throw new AgentException(String.format(
- "No class named %s was found", className), e);
- }
- }
-
- public String getLabel()
- {
- return label;
- }
-
- public void setLabel(String label)
- {
- this.label = label;
- }
-
- public AMQConnection getConnection()
- {
- return connection;
- }
-
- public void setConnection(AMQConnection connection)
- {
- this.connection = connection;
- }
-
- public boolean isSessionTransacted()
- {
- return sessionTransacted;
- }
-
- public void setSessionTransacted(boolean sessionTransacted)
- {
- this.sessionTransacted = sessionTransacted;
- }
-
- public void setManagedObjects(List objectList)
- {
- this.managedObjects = objectList;
- }
-
- public List getManagedObjects()
- {
- return managedObjects;
- }
-
- public void setRegisteredClasses(List objectList)
- {
- this.registeredClasses = objectList;
- }
-
- public List getRegisteredClasses()
- {
- return this.registeredClasses;
- }
-
- public static void main(String[] args) throws Exception
- {
- String broker = args[0];
- String name = args[1];
- String url = String.format(
- "amqp://guest:guest@/?brokerlist='tcp://%s'", broker);
- AMQConnection conn = new AMQConnection(url);
- Agent agent = new Agent(name, UUID.randomUUID());
- agent.setConnection(conn);
- for (int i = 2; i < args.length; i++)
- {
- Class<?> cls = Class.forName(args[i]);
- agent.register(new ManagedPOJO(cls.newInstance()));
- }
- agent.start();
- while (true)
- {
- Thread.sleep(1000);
- }
- }
-}