summaryrefslogtreecommitdiff
path: root/java/management/console/src/main/java/org/apache/qpid/console/Session.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/management/console/src/main/java/org/apache/qpid/console/Session.java')
-rw-r--r--java/management/console/src/main/java/org/apache/qpid/console/Session.java980
1 files changed, 980 insertions, 0 deletions
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Session.java b/java/management/console/src/main/java/org/apache/qpid/console/Session.java
new file mode 100644
index 0000000000..822f215f4d
--- /dev/null
+++ b/java/management/console/src/main/java/org/apache/qpid/console/Session.java
@@ -0,0 +1,980 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.console;//
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.jms.Message;
+
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Decoder;
+import org.apache.qpid.transport.codec.Encoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Session
+{
+ private static Logger log = LoggerFactory.getLogger(Session.class);
+ public static int CONTEXT_SYNC = 1;
+ public static int CONTEXT_STARTUP = 2;
+ public static int CONTEXT_MULTIGET = 3;
+ public static int DEFAULT_GET_WAIT_TIME = 60000;
+ public boolean recieveObjects = true;
+ public boolean recieveEvents = true;
+ public boolean recieveHeartbeat = true;
+ public boolean userBindings = false;
+ public Console console;
+ protected HashMap<String, HashMap<String, SchemaClass>> packages = new HashMap<String, HashMap<String, SchemaClass>>();
+ protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+ protected SequenceManager sequenceManager = new SequenceManager();
+ protected Object lockObject = new Object();
+ protected ArrayList<Long> syncSequenceList = new ArrayList<Long>();
+ protected ArrayList<QMFObject> getResult;
+ protected Object syncResult;
+
+ public Session()
+ {
+ }
+
+ public Session(Console console)
+ {
+ this.console = console;
+ }
+
+ public void addBroker(String url)
+ {
+ Broker broker = new Broker(this, url);
+ brokers.add(broker);
+ java.util.HashMap<String, Object> args = new java.util.HashMap<String, Object>();
+ args.put("_class", "agent");
+ args.put("_broker", broker);
+ this.getObjects(args);
+ }
+
+ public ArrayList<String> bindingKeys()
+ {
+ ArrayList<String> bindings = new ArrayList<String>();
+ bindings.add("schema.#");
+ if (recieveObjects & recieveEvents & recieveHeartbeat & !userBindings)
+ {
+ bindings.add("console.#");
+ } else
+ {
+ if (recieveObjects & !userBindings)
+ {
+ bindings.add("console.obj.#");
+ } else
+ {
+ bindings.add("console.obj.*.*.org.apache.qpid.broker.agent");
+ }
+ if (recieveEvents)
+ {
+ bindings.add("console.event.#");
+ }
+ if (recieveHeartbeat)
+ {
+ bindings.add("console.heartbeat.#");
+ }
+ }
+ return bindings;
+ }
+
+ public void close()
+ {
+ for (Broker broker : brokers.toArray(new Broker[0]))
+ {
+ this.removeBroker(broker);
+ }
+ }
+
+ protected QMFObject createQMFObject(SchemaClass schema,
+ boolean hasProperties, boolean hasStats, boolean isManaged)
+ {
+ Class realClass = QMFObject.class;
+ if (console != null)
+ {
+ realClass = console.typeMapping(schema.getKey());
+ }
+ Class[] types = new Class[]
+ { Session.class, SchemaClass.class, boolean.class, boolean.class,
+ boolean.class };
+ Object[] args = new Object[]
+ { this, schema, hasProperties, hasStats, isManaged };
+ try
+ {
+ Constructor ci = realClass.getConstructor(types);
+ return (QMFObject) ci.newInstance(args);
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+
+ protected QMFObject createQMFObject(SchemaClass schema, Decoder dec,
+ boolean hasProperties, boolean hasStats, boolean isManaged)
+ {
+ Class realClass = QMFObject.class;
+ if (console != null)
+ {
+ realClass = console.typeMapping(schema.getKey());
+ }
+ Class[] types = new Class[]
+ { Session.class, SchemaClass.class, Decoder.class, boolean.class,
+ boolean.class, boolean.class };
+ Object[] args = new Object[]
+ { this, schema, dec, hasProperties, hasStats, isManaged };
+ try
+ {
+ log.debug("" + realClass);
+ Constructor ci = realClass.getConstructor(types);
+ return (QMFObject) ci.newInstance(args);
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+
+ public Object decodeValue(Decoder dec, short type)
+ {
+ switch (type)
+ {
+ case 1: // U8
+ return dec.readUint8();
+ case 2: // U16
+ return dec.readUint16();
+ case 3: // U32
+ return dec.readUint32();
+ case 4: // U64
+ return dec.readUint64();
+ case 6: // SSTR
+ return dec.readStr8();
+ case 7: // LSTR
+ return dec.readStr16();
+ case 8: // ABSTIME
+ return dec.readDatetime();
+ case 9: // DELTATIME
+ return dec.readUint32();
+ case 10: // ref
+ return new ObjectID(dec);
+ case 11: // bool
+ return dec.readUint8() != 0;
+ case 12: // float
+ return dec.readFloat();
+ case 13: // double
+ return dec.readDouble();
+ case 14: // UUID
+ return dec.readUuid();
+ case 15: // Ftable
+ return dec.readMap();
+ case 16: // int8
+ return dec.readInt8();
+ case 17: // int16
+ return dec.readInt16();
+ case 18: // int32
+ return dec.readInt32();
+ case 19: // int64
+ return dec.readInt64();
+ case 20: // Object
+ // Peek into the inner type code, make sure
+ // it is actually an object
+ Object returnValue = null;
+ short innerTypeCode = dec.readUint8();
+ if (innerTypeCode != 20)
+ {
+ returnValue = this.decodeValue(dec, innerTypeCode);
+ } else
+ {
+ ClassKey classKey = new ClassKey(dec);
+ synchronized (lockObject)
+ {
+ SchemaClass sClass = getSchema(classKey);
+ if (sClass != null)
+ {
+ returnValue = this.createQMFObject(sClass, dec, true,
+ true, false);
+ }
+ }
+ }
+ return returnValue;
+ case 21: // List
+ BBDecoder lDec = new BBDecoder();
+ lDec.init(ByteBuffer.wrap(dec.readVbin32()));
+ long count = lDec.readUint32();
+ ArrayList<Object> newList = new ArrayList<Object>();
+ while (count > 0)
+ {
+ short innerType = lDec.readUint8();
+ newList.add(this.decodeValue(lDec, innerType));
+ count -= 1;
+ }
+ return newList;
+ case 22: // Array
+ BBDecoder aDec = new BBDecoder();
+ aDec.init(ByteBuffer.wrap(dec.readVbin32()));
+ long cnt = aDec.readUint32();
+ short innerType = aDec.readUint8();
+ ArrayList<Object> aList = new ArrayList<Object>();
+ while (cnt > 0)
+ {
+ aList.add(this.decodeValue(aDec, innerType));
+ cnt -= 1;
+ }
+ return aList;
+ default:
+ throw new ConsoleException(String.format("Invalid Type Code: %s",
+ type));
+ }
+ }
+
+ public void encodeValue(Encoder enc, short type, Object val)
+ {
+ try
+ {
+ switch (type)
+ {
+ case 1: // U8
+ enc.writeUint8(((Short) val).shortValue());
+ break;
+ case 2: // U16
+ enc.writeUint16(((Integer) val).intValue());
+ break;
+ case 3: // U32
+ enc.writeUint32(((Integer) val).longValue());
+ break;
+ case 4: // U64
+ enc.writeUint64(((Long) val).longValue());
+ break;
+ case 6: // SSTR
+ enc.writeStr8((String) val);
+ break;
+ case 7: // LSTR
+ enc.writeStr16((String) val);
+ break;
+ case 8: // ABSTIME
+ enc.writeDatetime(((Long) val).longValue());
+ break;
+ case 9: // DELTATIME
+ enc.writeUint32(((Long) val).longValue());
+ break;
+ case 10: // ref
+ ((ObjectID) val).encode(enc);
+ break;
+ case 11:
+ if (((Boolean) val).booleanValue())
+ {
+ enc.writeUint8((short) 1);
+ } else
+ {
+ enc.writeUint8((short) 0);
+ }
+ break;
+ case 12: // FLOAT
+ enc.writeFloat(((Float) val).floatValue());
+ break;
+ case 13: // DOUBLE
+ enc.writeDouble(((Double) val).doubleValue());
+ break;
+ case 14: // UUID
+ enc.writeUuid((UUID) val);
+ break;
+ case 15: // Ftable
+ enc.writeMap((HashMap) val);
+ break;
+ case 16: // int8
+ enc.writeInt8((Byte) val);
+ break;
+ case 17: // int16
+ enc.writeInt16((Short) val);
+ break;
+ case 18: // int32
+ enc.writeInt32((Integer) val);
+ break;
+ case 19: // int64
+ enc.writeInt64((Long) val);
+ break;
+ case 20: // Object
+ // Check that the object has a session, if not
+ // take ownership of it
+ QMFObject qObj = (QMFObject) val;
+ if (qObj.getSession() == null)
+ {
+ qObj.setSession(this);
+ }
+ qObj.encode(enc);
+ break;
+ case 21: // List
+ ArrayList<Object> items = (ArrayList<Object>) val;
+ BBEncoder lEnc = new BBEncoder(1);
+ lEnc.init();
+ lEnc.writeUint32(items.size());
+ for (Object obj : items)
+ {
+ short innerType = Util.qmfType(obj);
+ lEnc.writeUint8(innerType);
+ this.encodeValue(lEnc, innerType, obj);
+ }
+ enc.writeVbin32(lEnc.segment().array());
+ break;
+ case 22: // Array
+ ArrayList<Object> aItems = (ArrayList<Object>) val;
+ BBEncoder aEnc = new BBEncoder(1);
+ aEnc.init();
+ long aCount = aItems.size();
+ aEnc.writeUint32(aCount);
+ if (aCount > 0)
+ {
+ Object anObj = aItems.get(0);
+ short innerType = Util.qmfType(anObj);
+ aEnc.writeUint8(innerType);
+ for (Object obj : aItems)
+ {
+ this.encodeValue(aEnc, innerType, obj);
+ }
+ }
+ enc.writeVbin32(aEnc.segment().array());
+ break;
+ default:
+ throw new ConsoleException(String.format(
+ "Invalid Type Code: %s", type));
+ }
+ } catch (ClassCastException e)
+ {
+ String msg = String.format(
+ "Class cast exception for typecode %s, type %s ", type, val
+ .getClass());
+ log.error(msg);
+ throw new ConsoleException(msg + type, e);
+ }
+ }
+
+ public Broker getBroker(long BrokerBank)
+ {
+ Broker returnValue = null;
+ for (Broker broker : brokers)
+ {
+ if (broker.brokerBank() == BrokerBank)
+ {
+ returnValue = broker;
+ break;
+ }
+ }
+ return returnValue;
+ }
+
+ public ArrayList<ClassKey> getClasses(String packageName)
+ {
+ ArrayList<ClassKey> returnValue = new ArrayList<ClassKey>();
+ this.waitForStable();
+ if (packages.containsKey(packageName))
+ {
+ for (SchemaClass sClass : packages.get(packageName).values())
+ {
+ returnValue.add(sClass.getKey());
+ }
+ }
+ return returnValue;
+ }
+
+ public ArrayList<QMFObject> getObjects(
+ java.util.HashMap<String, Object> args)
+ {
+ ArrayList<Broker> brokerList = null;
+ ArrayList<Agent> agentList = new ArrayList<Agent>();
+ if (args.containsKey("_broker"))
+ {
+ brokerList = new ArrayList<Broker>();
+ brokerList.add((Broker) args.get("_broker"));
+ } else
+ {
+ brokerList = this.brokers;
+ }
+ for (Broker broker : brokerList)
+ {
+ broker.waitForStable();
+ }
+ if (args.containsKey("_agent"))
+ {
+ Agent agent = (Agent) args.get("_agent");
+ if (brokerList.contains(agent.getBroker()))
+ {
+ agentList.add(agent);
+ } else
+ {
+ throw new ConsoleException(
+ "Agent is not managed by this console or the supplied broker");
+ }
+ } else
+ {
+ if (args.containsKey("_objectId"))
+ {
+ ObjectID oid = (ObjectID) args.get("_objectId");
+ for (Broker broker : brokers)
+ {
+ for (Agent agent : broker.Agents.values())
+ {
+ if ((agent.getAgentBank() == oid.agentBank())
+ && (agent.getBrokerBank() == oid.brokerBank()))
+ {
+ agentList.add(agent);
+ }
+ }
+ }
+ } else
+ {
+ for (Broker broker : brokerList)
+ {
+ for (Agent agent : broker.Agents.values())
+ {
+ if (agent.getBroker().isConnected())
+ {
+ agentList.add(agent);
+ }
+ }
+ }
+ }
+ }
+ getResult = new ArrayList<QMFObject>();
+ if (agentList.size() > 0)
+ {
+ // FIXME Add a bunch of other suff too
+ for (Agent agent : agentList)
+ {
+ HashMap<String, Object> getParameters = new HashMap<String, Object>();
+ Broker broker = agent.getBroker();
+ long seq = -1;
+ synchronized (lockObject)
+ {
+ seq = sequenceManager.reserve(Session.CONTEXT_MULTIGET);
+ syncSequenceList.add(seq);
+ }
+ String packageName = (String) args.get("_package");
+ String className = (String) args.get("_class");
+ ClassKey key = (ClassKey) args.get("_key");
+ Object sClass = args.get("_schema");
+ Object oid = args.get("_objectID");
+ long[] hash = (long[]) args.get("_hash");
+ if ((className == null) && (oid == null) && (oid == null))
+ {
+ throw new ConsoleException(
+ "No class supplied, use '_schema', '_key', '_class', or '_objectId' argument");
+ }
+ if (oid != null)
+ {
+ getParameters.put("_objectID", oid);
+ } else
+ {
+ if (sClass != null)
+ {
+ key = (key != null) ? key : ((SchemaClass) sClass)
+ .getKey();
+ }
+ if (key != null)
+ {
+ className = (className != null) ? className : key
+ .getClassName();
+ packageName = (packageName != null) ? packageName : key
+ .getPackageName();
+ hash = (hash != null) ? hash : key.getHash();
+ }
+ if (packageName != null)
+ {
+ getParameters.put("_package", packageName);
+ }
+ if (className != null)
+ {
+ getParameters.put("_class", className);
+ }
+ if (hash != null)
+ {
+ getParameters.put("_hash", hash);
+ }
+ for (java.util.Map.Entry<String, Object> pair : args
+ .entrySet())
+ {
+ if (!pair.getKey().startsWith("_"))
+ {
+ getParameters.put(pair.getKey(), pair.getValue());
+ }
+ }
+ }
+ Encoder enc = broker.createEncoder('G', seq);
+ enc.writeMap(getParameters);
+ String routingKey = agent.routingCode();
+ Message msg = broker.createMessage(enc);
+ log.debug("Get Object Keys: ");
+ for (String pKey : getParameters.keySet())
+ {
+ log.debug(String.format("\tKey: '%s' Value: '%s'", pKey,
+ getParameters.get(pKey)));
+ }
+ broker.send(msg, routingKey);
+ }
+ int waittime = DEFAULT_GET_WAIT_TIME;
+ boolean timeout = false;
+ if (args.containsKey("_timeout"))
+ {
+ waittime = (Integer) args.get("_timeout");
+ }
+ long start = System.currentTimeMillis();
+ synchronized (lockObject)
+ {
+ // FIXME ERROR
+ while (syncSequenceList.size() > 0)
+ {
+ try
+ {
+ lockObject.wait(waittime);
+ } catch (InterruptedException e)
+ {
+ throw new ConsoleException(e);
+ }
+ long duration = System.currentTimeMillis() - start;
+ if (duration > waittime)
+ {
+ for (long pendingSeq : syncSequenceList)
+ {
+ sequenceManager.release(pendingSeq);
+ }
+ syncSequenceList.clear();
+ timeout = true;
+ }
+ }
+ }
+ // FIXME Add the error logic
+ if ((getResult.isEmpty()) && timeout)
+ {
+ throw new ConsoleException("Get Request timed out");
+ }
+ }
+ return getResult;
+ }
+
+ public ArrayList<String> getPackages()
+ {
+ this.waitForStable();
+ ArrayList<String> returnValue = new ArrayList<String>();
+ for (String name : packages.keySet())
+ {
+ returnValue.add(name);
+ }
+ return returnValue;
+ }
+
+ public SchemaClass getSchema(ClassKey key)
+ {
+ return getSchema(key, true);
+ }
+
+ protected SchemaClass getSchema(ClassKey key, boolean waitForStable)
+ {
+ if (waitForStable)
+ {
+ this.waitForStable();
+ }
+ SchemaClass returnValue = null;
+ returnValue = packages.get(key.getPackageName())
+ .get(key.getKeyString());
+ return returnValue;
+ }
+
+ public void handleAgentRemoved(Agent agent)
+ {
+ if (console != null)
+ {
+ console.agentRemoved(agent);
+ }
+ }
+
+ public void handleBrokerConnect(Broker broker)
+ {
+ if (console != null)
+ {
+ console.brokerConnected(broker);
+ }
+ }
+
+ public void handleBrokerDisconnect(Broker broker)
+ {
+ if (console != null)
+ {
+ console.brokerDisconnected(broker);
+ }
+ }
+
+ public void handleBrokerResponse(Broker broker, Decoder decoder,
+ long sequence)
+ {
+ if (console != null)
+ {
+ console.brokerInformation(broker);
+ }
+ long seq = sequenceManager.reserve(CONTEXT_STARTUP);
+ Encoder encoder = broker.createEncoder('P', seq);
+ broker.send(encoder);
+ }
+
+ public void handleClassIndicator(Broker broker, Decoder decoder,
+ long sequence)
+ {
+ short kind = decoder.readUint8();
+ ClassKey classKey = new ClassKey(decoder);
+ boolean unknown = false;
+ synchronized (lockObject)
+ {
+ if (packages.containsKey(classKey.getPackageName()))
+ {
+ if (!packages.get(classKey.getPackageName()).containsKey(
+ classKey.getKeyString()))
+ {
+ unknown = true;
+ }
+ }
+ }
+ if (unknown)
+ {
+ broker.incrementOutstanding();
+ long seq = sequenceManager.reserve(Session.CONTEXT_STARTUP);
+ Encoder enc = broker.createEncoder('S', seq);
+ classKey.encode(enc);
+ broker.send(enc);
+ }
+ }
+
+ public void handleCommandComplete(Broker broker, Decoder decoder,
+ long sequence)
+ {
+ long code = decoder.readUint32();
+ String text = decoder.readStr8();
+ Object context = this.sequenceManager.release(sequence);
+ if (context.equals(CONTEXT_STARTUP))
+ {
+ broker.decrementOutstanding();
+ } else
+ {
+ if ((context.equals(CONTEXT_SYNC)) & broker.getSyncInFlight())
+ {
+ broker.setSyncInFlight(false);
+ } else
+ {
+ if (context.equals(CONTEXT_MULTIGET)
+ && syncSequenceList.contains(sequence))
+ {
+ synchronized (lockObject)
+ {
+ syncSequenceList.remove(sequence);
+ if (syncSequenceList.isEmpty())
+ {
+ lockObject.notifyAll();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void handleContentIndicator(Broker broker, Decoder decoder,
+ long sequence, boolean hasProperties, boolean hasStatistics)
+ {
+ ClassKey key = new ClassKey(decoder);
+ SchemaClass sClass = null;
+ ;
+ synchronized (lockObject)
+ {
+ sClass = getSchema(key, false);
+ }
+ if (sClass != null)
+ {
+ QMFObject obj = this.createQMFObject(sClass, decoder,
+ hasProperties, hasStatistics, true);
+ if (key.getPackageName().equals("org.apache.qpid.broker")
+ && key.getClassName().equals("agent") && hasProperties)
+ {
+ broker.updateAgent(obj);
+ }
+ synchronized (lockObject)
+ {
+ if (syncSequenceList.contains(sequence))
+ {
+ if (!obj.isDeleted() && this.selectMatch(obj))
+ {
+ getResult.add(obj);
+ }
+ }
+ }
+ if (console != null)
+ {
+ if (hasProperties)
+ {
+ console.objectProperties(broker, obj);
+ }
+ if (hasStatistics)
+ {
+ console.objectStatistics(broker, obj);
+ }
+ }
+ }
+ }
+
+ public void handleEventIndicator(Broker broker, Decoder decoder,
+ long sequence)
+ {
+ if (console != null)
+ {
+ QMFEvent newEvent = new QMFEvent(this, decoder);
+ console.eventRecieved(broker, newEvent);
+ }
+ }
+
+ public void handleHeartbeatIndicator(Broker broker, Decoder decoder,
+ long sequence, Message msg)
+ {
+ if (console != null)
+ {
+ long brokerBank = 1;
+ long agentBank = 0;
+ try
+ {
+ // FIXME HOW DO WE GET THE ROUTING KEY
+ // String routingKey = msg.DeliveryProperties.getRoutingKey();
+ String routingKey = null;
+ if (routingKey != null)
+ {
+ agentBank = Agent.getBrokerBank(routingKey);
+ brokerBank = Agent.getBrokerBank(routingKey);
+ }
+ } catch (Throwable e)
+ {
+ log.warn("Internal QPID error", e);
+ }
+ String agentKey = Agent.AgentKey(agentBank, brokerBank);
+ long timestamp = decoder.readUint64();
+ if (broker.Agents.containsKey(agentKey))
+ {
+ Agent agent = broker.Agents.get(agentKey);
+ console.hearbeatRecieved(agent, timestamp);
+ }
+ }
+ }
+
+ public void handleMethodResponse(Broker broker, Decoder decoder,
+ long sequence)
+ {
+ long code = decoder.readUint32();
+ String text = decoder.readStr16();
+ java.util.HashMap<String, Object> outArgs = new java.util.HashMap<String, Object>();
+ Object obj = sequenceManager.release(sequence);
+ if (obj == null)
+ {
+ return;
+ }
+ Object[] pair = (Object[]) obj;
+ if (code == 0)
+ {
+ for (SchemaArgument arg : ((SchemaMethod) pair[0]).Arguments)
+ {
+ if (arg.isOutput())
+ {
+ outArgs.put(arg.getName(), this.decodeValue(decoder, arg
+ .getType()));
+ }
+ }
+ }
+ MethodResult result = new MethodResult(code, text, outArgs);
+ if ((Boolean) pair[1])
+ {
+ this.syncResult = result;
+ broker.setSyncInFlight(false);
+ }
+ if (console != null)
+ {
+ console.methodResponse(broker, sequence, result);
+ }
+ }
+
+ // Callback Methods
+ public void handleNewAgent(Agent agent)
+ {
+ if (console != null)
+ {
+ console.newAgent(agent);
+ }
+ }
+
+ public void handlePackageIndicator(Broker broker, Decoder decoder,
+ long sequence)
+ {
+ String packageName = decoder.readStr8();
+ boolean notify = false;
+ if (!packages.containsKey(packageName))
+ {
+ synchronized (lockObject)
+ {
+ packages.put(packageName,
+ new java.util.HashMap<String, SchemaClass>());
+ notify = true;
+ }
+ }
+ if (notify && console != null)
+ {
+ console.newPackage(packageName);
+ }
+ broker.incrementOutstanding();
+ long seq = sequenceManager.reserve(Session.CONTEXT_STARTUP);
+ Encoder enc = broker.createEncoder('Q', seq);
+ enc.writeStr8(packageName);
+ broker.send(enc);
+ }
+
+ public void handleSchemaResponse(Broker broker, Decoder decoder,
+ long sequence)
+ {
+ short kind = decoder.readUint8();
+ ClassKey classKey = new ClassKey(decoder);
+ SchemaClass sClass = new SchemaClass(kind, classKey, decoder, this);
+ synchronized (lockObject)
+ {
+ java.util.HashMap<String, SchemaClass> classMappings = packages
+ .get(sClass.getPackageName());
+ classMappings.remove(sClass.getClassKeyString());
+ classMappings.put(sClass.getClassKeyString(), sClass);
+ log.debug(classKey.toString());
+ }
+ sequenceManager.release(sequence);
+ broker.decrementOutstanding();
+ if (console != null)
+ {
+ this.console.newClass(kind, classKey);
+ }
+ }
+
+ public MethodResult invokeMethod(QMFObject obj, String name,
+ List<Object> args, boolean synchronous, int timeToLive)
+ {
+ Broker aBroker = this.getBroker(obj.brokerBank());
+ long seq = this.sendMethodRequest(obj, aBroker, name, args,
+ synchronous, timeToLive);
+ if (seq != 0)
+ {
+ if (!synchronous)
+ {
+ return null;
+ }
+ try
+ {
+ aBroker.waitForSync(timeToLive);
+ } catch (Throwable e)
+ {
+ sequenceManager.release(seq);
+ throw new ConsoleException(e);
+ }
+ // FIXME missing error logic in the broker
+ return (MethodResult) syncResult;
+ }
+ return null;
+ }
+
+ public QMFObject makeObject(ClassKey key)
+ {
+ SchemaClass sClass = this.getSchema(key);
+ if (sClass == null)
+ {
+ throw new ConsoleException("No schema found for class "
+ + key.toString());
+ }
+ return this.createQMFObject(sClass, true, true, false);
+ }
+
+ public QMFObject makeObject(String keyString)
+ {
+ return this.makeObject(new ClassKey(keyString));
+ }
+
+ public void removeBroker(Broker broker)
+ {
+ if (brokers.contains(broker))
+ {
+ brokers.remove(broker);
+ }
+ broker.shutdown();
+ }
+
+ public boolean selectMatch(QMFObject obj)
+ {
+ return true;
+ }
+
+ protected long sendMethodRequest(QMFObject obj, Broker aBroker,
+ String name, List<Object> args, boolean synchronous, int timeToLive)
+ {
+ SchemaMethod method = obj.getSchema().getMethod(name);
+ if (args == null)
+ {
+ args = new ArrayList<Object>();
+ }
+ long seq = 0;
+ if (method != null)
+ {
+ Object[] pair =
+ { method, synchronous };
+ seq = sequenceManager.reserve(pair);
+ Encoder enc = aBroker.createEncoder('M', seq);
+ obj.getObjectID().encode(enc);
+ obj.getSchema().getKey().encode(enc);
+ enc.writeStr8(name);
+ if (args.size() < method.getInputArgCount())
+ {
+ throw new ConsoleException(String.format(
+ "Incorrect number of arguments: expected %s, got %s",
+ method.getInputArgCount(), args.size()));
+ }
+ int argIndex = 0;
+ for (SchemaArgument arg : method.Arguments)
+ {
+ if (arg.isInput())
+ {
+ this.encodeValue(enc, arg.getType(), args.get(argIndex));
+ argIndex += 1;
+ }
+ }
+ Message msg = aBroker.createMessage(enc);
+ if (synchronous)
+ {
+ aBroker.setSyncInFlight(true);
+ }
+ aBroker.send(msg, obj.routingKey(), timeToLive);
+ }
+ return seq;
+ }
+
+ protected void waitForStable()
+ {
+ for (Broker broker : brokers)
+ {
+ broker.waitForStable();
+ }
+ }
+}