summaryrefslogtreecommitdiff
path: root/qpid/dotnet/client-010/management/console/Session.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/client-010/management/console/Session.cs')
-rw-r--r--qpid/dotnet/client-010/management/console/Session.cs796
1 files changed, 796 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/management/console/Session.cs b/qpid/dotnet/client-010/management/console/Session.cs
new file mode 100644
index 0000000000..d9c5948e57
--- /dev/null
+++ b/qpid/dotnet/client-010/management/console/Session.cs
@@ -0,0 +1,796 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO ;
+using System.Reflection ;
+using System.Threading ;
+using log4net ;
+using org.apache.qpid.client ;
+using org.apache.qpid.transport.util;
+using org.apache.qpid.transport.codec ;
+
+namespace org.apache.qpid.console
+{
+
+ /**
+ * Main Interaction point for interaction with the bus. Can be used to locate objects
+ * on the bus, and invoke messages on them.
+ */
+ public class Session
+ {
+ public static ILog log = LogManager.GetLogger(typeof(Session)) ;
+
+ public static int CONTEXT_SYNC = 1 ;
+ public static int CONTEXT_STARTUP = 2 ;
+ public static int CONTEXT_MULTIGET = 3 ;
+ public static int DEFAULT_GET_WAIT_TIME = 60000 ;
+
+ public bool RecieveObjects = true ;
+ public bool RecieveEvents = true ;
+ public bool RecieveHeartbeat = true ;
+ public bool UserBindings = false ;
+ public Console Console ;
+
+ protected Dictionary<string, Dictionary<string, SchemaClass>> Packages = new Dictionary<string, Dictionary<string, SchemaClass>>() ;
+ protected List<Broker> Brokers = new List<Broker>() ;
+ protected SequenceManager SequenceManager = new SequenceManager() ;
+ protected object LockObject = new Object();
+ protected List<long> SyncSequenceList = new List<long>() ;
+ protected List<QMFObject> GetResult ;
+ protected Object SyncResult ;
+
+ public Session()
+ {
+ }
+
+ public Session(Console console) {
+ Console = console ;
+ }
+
+ public void AddBroker(string url) {
+ BrokerURL brokerurl = GetBrokerURL(url) ;
+ Broker broker = new Broker(this, brokerurl) ;
+ Brokers.Add(broker) ;
+
+ Dictionary<string, object> args = new Dictionary<string, object>() ;
+ args.Add("_class", "agent") ;
+ args.Add("_broker", broker) ;
+ this.GetObjects(args) ;
+ }
+
+ public void RemoveBroker(Broker broker) {
+ if (Brokers.Contains(broker)) {
+ Brokers.Remove(broker) ;
+ }
+
+ broker.Shutdown() ;
+ }
+
+ public void Close() {
+ foreach (Broker broker in Brokers.ToArray()) {
+ this.RemoveBroker(broker) ;
+ }
+ }
+
+ protected BrokerURL GetBrokerURL(string url) {
+ return new BrokerURL(url) ;
+ }
+
+
+ public List<QMFObject> GetObjects(Dictionary<string, object> args) {
+ List<Broker> brokerList = null ;
+ List<Agent> agentList = new List<Agent>() ;
+
+ if (args.ContainsKey("_broker")) {
+ brokerList = new List<Broker>() ;
+ brokerList.Add((Broker)args["_broker"]) ;
+ } else {
+ brokerList = this.Brokers ;
+ }
+
+ foreach (Broker broker in brokerList) {
+ broker.WaitForStable() ;
+ }
+
+ if (args.ContainsKey("_agent")) {
+ Agent agent = (Agent)args["_agent"] ;
+ if (brokerList.Contains(agent.Broker)) {
+ agentList.Add(agent) ;
+ } else {
+ throw new Exception("Agent is not managed by this console or the supplied broker") ;
+ }
+ } else {
+ if (args.ContainsKey("_objectId")) {
+ ObjectID oid = (ObjectID) args["_objectId"] ;
+ foreach (Broker broker in Brokers) {
+ foreach (Agent agent in broker.Agents.Values) {
+ if ((agent.AgentBank == oid.AgentBank()) && (agent.BrokerBank == oid.BrokerBank())) {
+ agentList.Add(agent) ;
+ }
+ }
+ }
+ }
+ else {
+ foreach (Broker broker in brokerList) {
+ foreach (Agent agent in broker.Agents.Values) {
+ if (agent.Broker.IsConnected()) {
+ agentList.Add(agent) ;
+ }
+ }
+ }
+ }
+ }
+
+ GetResult = new List<QMFObject>() ;
+
+ if (agentList.Count > 0) {
+ //FIXME Add a bunch of other suff too
+ foreach (Agent agent in agentList) {
+ Dictionary<string, object> getParameters = new Dictionary<string, object>() ;
+ Broker broker = agent.Broker ;
+ long seq = -1 ;
+ lock(LockObject) {
+ seq = SequenceManager.Reserve(Session.CONTEXT_MULTIGET) ;
+ SyncSequenceList.Add(seq) ;
+ }
+ object packageName = null ;
+ object className = null ;
+ object key = null ;
+ object sClass = null ;
+ object oid = null ;
+ object hash = null ;
+
+ args.TryGetValue("_schema", out sClass) ;
+ args.TryGetValue("_key", out key) ;
+ args.TryGetValue("_class", out className) ;
+ args.TryGetValue("_package", out packageName) ;
+ args.TryGetValue("_objectID", out oid) ;
+ args.TryGetValue("_hash", out hash) ;
+
+ if ((className == null) && (oid == null) && (oid == null)) {
+ throw new Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") ;
+ }
+
+ if (oid != null) {
+ getParameters.Add("_objectID", oid) ;
+ }
+ else {
+ if (sClass != null) {
+ key = key ?? ((SchemaClass)sClass).Key ;
+ }
+ if (key != null) {
+ ClassKey cKey = (ClassKey)key ;
+ className = className ?? cKey.ClassName ;
+ packageName = packageName ?? cKey.PackageName ;
+ hash = hash ?? cKey.Hash ;
+ }
+
+ if (packageName != null) {
+ getParameters.Add("_package", packageName) ;
+ }
+ if (className != null) {
+ getParameters.Add("_class", className) ;
+ }
+ if (hash != null) {
+ getParameters.Add("_hash", hash) ;
+ }
+ foreach (KeyValuePair<string, object> pair in args) {
+ if (!pair.Key.StartsWith("_")) {
+ getParameters.Add(pair.Key, pair.Value) ;
+ }
+ }
+ }
+
+ IEncoder enc = broker.CreateEncoder('G', seq) ;
+ enc.WriteMap(getParameters) ;
+ string routingKey = String.Format("agent.{0}.{1}", agent.BrokerBank, agent.AgentBank) ;
+ Message msg = broker.CreateMessage(enc, routingKey) ;
+ log.Debug("Get Object Keys: ") ;
+ foreach (string pKey in getParameters.Keys) {
+ log.Debug(String.Format("\tKey: '{0}' Value: '{1}'", pKey, getParameters[pKey])) ;
+ }
+ broker.Send(msg) ;
+ }
+
+ int waittime = DEFAULT_GET_WAIT_TIME ;
+ bool timeout = false ;
+ if (args.ContainsKey("_timeout")) {
+ waittime = (int) args["_timeout"] ;
+ }
+ DateTime start = DateTime.Now ;
+ lock (LockObject) {
+ // FIXME ERROR
+ while (SyncSequenceList.Count > 0){
+ Monitor.Wait(LockObject,waittime) ;
+ TimeSpan duration = DateTime.Now - start;
+ if (duration.TotalMilliseconds > waittime) {
+ foreach (long pendingSeq in SyncSequenceList) {
+ SequenceManager.Release(pendingSeq) ;
+ }
+ SyncSequenceList.Clear() ;
+ timeout = true ;
+ }
+ }
+ }
+
+ //FIXME Add the error logic
+ if ((GetResult.Count == 0) && timeout) {
+ throw new Exception ("Get Request timed out") ;
+ }
+ }
+ return GetResult ;
+ }
+
+ public List<string> GetPackages() {
+ this.WaitForStable() ;
+ List<string> returnValue = new List<string>() ;
+ foreach (String name in Packages.Keys) {
+ returnValue.Add(name) ;
+ }
+
+ return returnValue ;
+ }
+
+ public List<ClassKey> GetClasses(string packageName) {
+ List<ClassKey> returnValue = new List<ClassKey>() ;
+ this.WaitForStable() ;
+
+ if (Packages.ContainsKey(packageName)) {
+ foreach (SchemaClass sClass in Packages[packageName].Values) {
+ returnValue.Add(sClass.Key) ;
+ }
+ }
+
+ return returnValue ;
+ }
+
+ public SchemaClass GetSchema(ClassKey key) {
+ return GetSchema(key, true) ;
+ }
+
+ protected SchemaClass GetSchema(ClassKey key, bool waitForStable) {
+ if (waitForStable) {
+ this.WaitForStable() ;
+ }
+
+ SchemaClass returnValue = null ;
+
+ try {
+ returnValue = Packages[key.PackageName][key.GetKeyString()] ;
+ }
+ catch (KeyNotFoundException) {
+ // eat it
+ }
+
+ return returnValue ;
+ }
+
+
+ protected void WaitForStable() {
+ foreach (Broker broker in Brokers) {
+ broker.WaitForStable() ;
+ }
+ }
+
+
+ public Broker GetBroker(long BrokerBank) {
+ Broker returnValue = null ;
+
+ foreach (Broker broker in Brokers) {
+ if (broker.BrokerBank() == BrokerBank) {
+ returnValue = broker ;
+ break ;
+ }
+ }
+
+ return returnValue ;
+ }
+
+ public MethodResult InvokeMethod(QMFObject obj, string name, List<object> args, bool synchronous, int timeToLive) {
+ Broker aBroker = this.GetBroker(obj.BrokerBank()) ;
+
+ long seq = this.SendMethodRequest(obj, aBroker, name, args, synchronous, timeToLive) ;
+ if (seq != 0) {
+ if (!synchronous) {
+ return null ;
+ }
+ try {
+ aBroker.WaitForSync(timeToLive) ;
+ } catch (Exception e) {
+ SequenceManager.Release(seq) ;
+ throw e ;
+ }
+
+ // FIXME missing error logic in the broker
+ return (MethodResult) SyncResult ;
+ }
+
+ return null ;
+ }
+
+ protected long SendMethodRequest(QMFObject obj, Broker aBroker, string name, List<object> args, bool synchronous, int timeToLive) {
+
+ SchemaMethod method = obj.Schema.GetMethod(name) ;
+ if (args == null) {
+ args = new List<object>() ;
+ }
+
+ long seq = 0 ;
+ if (method != null) {
+ KeyValuePair<SchemaMethod, bool> pair = new KeyValuePair<SchemaMethod, bool>(method, synchronous) ;
+ seq = SequenceManager.Reserve(pair) ;
+ IEncoder enc = aBroker.CreateEncoder('M', seq) ;
+ obj.ObjectID.encode(enc) ;
+ obj.Schema.Key.encode(enc) ;
+ enc.WriteStr8(name) ;
+
+ if (args.Count < method.InputArgCount) {
+ throw new Exception(String.Format("Incorrect number of arguments: expected {0}, got{1}", method.InputArgCount, args.Count)) ;
+ }
+
+ int argIndex = 0 ;
+ foreach (SchemaArgument arg in method.Arguments) {
+ if (arg.IsInput()) {;
+ this.EncodeValue(enc, arg.Type, args[argIndex]) ;
+ argIndex += 1 ;
+ }
+ }
+
+ Message msg = aBroker.CreateMessage(enc,obj.RoutingKey(),timeToLive) ;
+
+ if (synchronous) {
+ aBroker.SetSyncInFlight(true) ;
+ }
+ aBroker.Send(msg) ;
+ }
+ return seq ;
+ }
+
+ public QMFObject MakeObject(ClassKey key) {
+ SchemaClass sClass = this.GetSchema(key) ;
+ if (sClass == null) {
+ throw new Exception("No schema found for class " + key.ToString()) ;
+ }
+
+ return this.CreateQMFObject(sClass, true, true, false) ;
+ }
+
+ public QMFObject MakeObject(String keyString) {
+ return this.MakeObject(new ClassKey(keyString)) ;
+ }
+
+ // Callback Methods
+ public void HandleNewAgent(Agent agent) {
+ if (Console != null) {
+ Console.NewAgent(agent) ;
+ }
+ }
+
+ public void HandleAgentRemoved(Agent agent) {
+ if (Console != null) {
+ Console.AgentRemoved(agent) ;
+ }
+ }
+
+ public void HandleBrokerConnect(Broker broker) {
+ if (Console != null) {
+ Console.BrokerConnected(broker) ;
+ }
+ }
+
+ public void HandleBrokerDisconnect(Broker broker) {
+ if (Console != null) {
+ Console.BrokerDisconnected(broker) ;
+ }
+ }
+
+ public void HandleBrokerResponse(Broker broker, IDecoder decoder, long sequence) {
+ if (Console != null) {
+ Console.BrokerInformation(broker) ;
+ }
+
+ long seq = SequenceManager.Reserve(CONTEXT_STARTUP) ;
+ IEncoder endocder = broker.CreateEncoder('P', seq) ;
+ broker.Send(endocder) ;
+ }
+
+ public void HandlePackageIndicator(Broker broker, IDecoder decoder, long sequence) {
+ string packageName = decoder.ReadStr8() ;
+ bool notify = false ;
+ if (!Packages.ContainsKey(packageName)) {
+ lock (LockObject) {
+ Packages[packageName] = new Dictionary<string, SchemaClass>() ;
+ notify = true ;
+ }
+ }
+
+ if (notify && Console != null) {
+ Console.NewPackage(packageName) ;
+ }
+
+ broker.IncrementOutstanding() ;
+ long seq = SequenceManager.Reserve(Session.CONTEXT_STARTUP) ;
+ IEncoder enc = broker.CreateEncoder('Q', seq) ;
+ enc.WriteStr8(packageName) ;
+ broker.Send(enc) ;
+ }
+
+ public void HandleCommandComplete(Broker broker, IDecoder decoder, long sequence) {
+
+ long code = decoder.ReadUint32() ;
+ string text = decoder.ReadStr8() ;
+ Object context = this.SequenceManager.Release(sequence) ;
+
+ if (context.Equals(CONTEXT_STARTUP)) {
+ broker.DecrementOutstanding() ;
+ } else {
+ if ((context.Equals(CONTEXT_SYNC)) & broker.GetSyncInFlight()) {
+ broker.SetSyncInFlight(false) ;
+ } else {
+ if (context.Equals(CONTEXT_MULTIGET) && SyncSequenceList.Contains(sequence)) {
+ lock(LockObject) {
+ SyncSequenceList.Remove(sequence) ;
+ if (SyncSequenceList.Count == 0) {
+ Monitor.PulseAll(LockObject) ;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void HandleClassIndicator(Broker broker, IDecoder decoder, long sequence) {
+ short kind = decoder.ReadUint8() ;
+ ClassKey classKey = new ClassKey(decoder) ;
+ bool unknown = false ;
+
+
+ lock (LockObject) {
+ if (Packages.ContainsKey(classKey.PackageName)) {
+ if (!Packages[classKey.PackageName].ContainsKey(classKey.GetKeyString())) {
+ unknown = true ;
+ }
+ }
+ }
+
+ if (unknown) {
+ broker.IncrementOutstanding() ;
+ long seq = SequenceManager.Reserve(Session.CONTEXT_STARTUP) ;
+ IEncoder enc = broker.CreateEncoder('S', seq) ;
+ classKey.encode(enc) ;
+ broker.Send(enc) ;
+ }
+ }
+
+ public void HandleMethodResponse(Broker broker, IDecoder decoder, long sequence) {
+ long code = decoder.ReadUint32() ;
+ string text = decoder.ReadStr16() ;
+
+ Dictionary<string, object> outArgs = new Dictionary<string, object>() ;
+ object obj = SequenceManager.Release(sequence) ;
+
+ if (obj == null) {
+ return ;
+ }
+
+ KeyValuePair<SchemaMethod, bool> pair = (KeyValuePair<SchemaMethod, bool>) obj ;
+ if (code == 0) {
+ foreach (SchemaArgument arg in pair.Key.Arguments) {
+ if (arg.IsOutput()) {
+ outArgs.Add(arg.Name, this.DecodeValue(decoder, arg.Type)) ;
+ }
+ }
+ }
+
+ MethodResult result = new MethodResult(code, text, outArgs) ;
+
+ if (pair.Value) {
+ this.SyncResult = result;
+ broker.SetSyncInFlight(false) ;
+ }
+
+ if (Console != null) {
+ Console.MethodResponse(broker, sequence, result) ;
+ }
+ }
+
+ public void HandleHeartbeatIndicator(Broker broker, IDecoder decoder, long sequence, IMessage msg) {
+ if (Console != null) {
+ long brokerBank = 1 ;
+ long agentBank = 0 ;
+ try {
+ string routingKey = msg.DeliveryProperties.GetRoutingKey() ;
+ if (routingKey != null) {
+ agentBank = Agent.GetBrokerBank(routingKey) ;
+ brokerBank = Agent.GetBrokerBank(routingKey) ;
+ }
+ }
+ catch (Exception e) {
+ log.Warn("Internal QPID error", e) ;
+ }
+
+ string agentKey = Agent.AgentKey(agentBank, brokerBank) ;
+ long timestamp = decoder.ReadUint64() ;
+ if (broker.Agents.ContainsKey(agentKey)) {
+ Agent agent = broker.Agents[agentKey] ;
+ Console.HearbeatRecieved(agent, timestamp) ;
+ }
+ }
+
+ }
+
+ public void HandleEventIndicator(Broker broker, IDecoder decoder, long sequence) {
+ if (Console != null) {
+ QMFEvent newEvent = new QMFEvent(this, decoder) ;
+ Console.EventRecieved(broker, newEvent) ;
+ }
+ }
+
+ public void HandleSchemaResponse(Broker broker, IDecoder decoder, long sequence) {
+ short kind = decoder.ReadUint8() ;
+ ClassKey classKey = new ClassKey(decoder) ;
+ SchemaClass sClass = new SchemaClass(kind, classKey, decoder, this) ;
+ lock(LockObject) {
+ Dictionary<string, SchemaClass> classMappings = Packages[sClass.PackageName] ;
+ classMappings.Remove(sClass.ClassKeyString) ;
+ classMappings.Add(sClass.ClassKeyString, sClass) ;
+ }
+
+ SequenceManager.Release(sequence) ;
+ broker.DecrementOutstanding() ;
+ if (Console != null) {
+ this.Console.NewClass(kind, classKey) ;
+ }
+ }
+
+ public void HandleContentIndicator(Broker broker, IDecoder decoder, long sequence, bool hasProperties, bool hasStatistics) {
+
+ ClassKey key = new ClassKey(decoder) ;
+ SchemaClass sClass = null ;;
+ lock (LockObject) {
+ sClass = GetSchema(key, false) ;
+ }
+ if (sClass != null) {
+ QMFObject obj = this.CreateQMFObject(sClass, decoder, hasProperties, hasStatistics, true) ;
+
+ if (key.PackageName.Equals("org.apache.qpid.broker") && key.ClassName.Equals("agent") && hasProperties) {
+ broker.UpdateAgent(obj) ;
+ }
+
+ lock (LockObject) {
+ if (SyncSequenceList.Contains(sequence)) {
+ if (!obj.IsDeleted() && this.SelectMatch(obj)) {
+ GetResult.Add(obj) ;
+ }
+ }
+ }
+
+ if (Console != null) {
+ if (hasProperties) {
+ Console.ObjectProperties(broker, obj) ;
+ }
+ if (hasStatistics) {
+ Console.ObjectStatistics(broker, obj) ;
+ }
+ }
+ }
+ }
+
+ public bool SelectMatch(QMFObject obj) {
+ return true ;
+ }
+
+ public object DecodeValue(IDecoder dec, short type) {
+
+ switch (type) {
+ case 1: return dec.ReadUint8() ; // U8
+ case 2: return dec.ReadUint16() ; // U16
+ case 3: return dec.ReadUint32() ; // U32
+ case 4: return dec.ReadUint64() ; // U64
+ case 6: return dec.ReadStr8() ; // SSTR
+ case 7: return dec.ReadStr16() ; // LSTR
+ case 8: return dec.ReadDatetime() ; // ABSTIME
+ case 9: return dec.ReadUint32() ; // DELTATIME
+ case 10: return new ObjectID(dec) ; // ref
+ case 11: return dec.ReadUint8() != 0 ; // bool
+ case 12: return dec.ReadFloat() ; // float
+ case 13: return dec.ReadDouble() ; // double
+ case 14: return dec.ReadUuid() ; // UUID
+ case 15: return dec.ReadMap() ; // Ftable
+ case 16: return dec.ReadInt8() ; // int8
+ case 17: return dec.ReadInt16() ; // int16
+ case 18: return dec.ReadInt32() ; // int32
+ case 19: return dec.ReadInt64() ; // int64
+ case 20: // Object
+ // Peek into the inner type code, make sure
+ // it is actually an object
+ object returnValue = null ;
+ short innerTypeCode = dec.ReadUint8() ;
+ if (innerTypeCode != 20) {
+ returnValue = this.DecodeValue(dec, innerTypeCode) ;
+ }
+ else {
+ ClassKey classKey = new ClassKey(dec) ;
+ lock(LockObject) {
+ SchemaClass sClass = GetSchema(classKey) ;
+ if (sClass != null) {
+ returnValue = this.CreateQMFObject(sClass, dec, true, true, false) ;
+ }
+ }
+ }
+ return returnValue;
+ case 21: // List
+ {
+ MSDecoder lDec = new MSDecoder();
+ lDec.Init(new MemoryStream(dec.ReadVbin32()));
+ long count = lDec.ReadUint32();
+ List<object> newList = new List<object>();
+ while (count > 0)
+ {
+ short innerType = lDec.ReadUint8();
+ newList.Add(this.DecodeValue(lDec, innerType));
+ count -= 1;
+ }
+ return newList;
+ }
+ case 22: // Array
+ {
+ MSDecoder aDec = new MSDecoder();
+ aDec.Init(new MemoryStream(dec.ReadVbin32()));
+ long cnt = aDec.ReadUint32();
+ short innerType = aDec.ReadUint8();
+ List<object> aList = new List<object>();
+ while (cnt > 0)
+ {
+ aList.Add(this.DecodeValue(aDec, innerType));
+ cnt -= 1;
+ }
+ return aList;
+ }
+ default:
+ throw new Exception(String.Format("Invalid Type Code: {0}", type)) ;
+ }
+ }
+
+
+ public void EncodeValue(IEncoder enc, short type, object val) {
+ try {
+ switch ((int)type) {
+ case 1: enc.WriteUint8((short) val) ; break; // U8
+ case 2: enc.WriteUint16((int) val) ; break; // U16
+ case 3: enc.WriteUint32((long) val) ; break; // U32
+ case 4: enc.WriteUint64((long) val) ; break; // U64
+ case 6: enc.WriteStr8((string) val) ; break; // SSTR
+ case 7: enc.WriteStr16((string) val) ; break; // LSTR
+ case 8: enc.WriteDatetime((long) val); break; // ABSTIME
+ case 9: enc.WriteUint32((long) val); break; // DELTATIME
+ case 10: ((ObjectID)val).encode(enc) ; break; // ref
+ case 11:
+ if ((bool) val) {
+ enc.WriteUint8(1) ;
+ } else {
+ enc.WriteUint8(0) ;
+ }
+ break ;
+ case 12: enc.WriteFloat((float) val); break; // FLOAT
+ case 13: enc.WriteDouble((double) val); break; // DOUBLE
+ case 14: enc.WriteUuid((UUID) val) ; break ; // UUID
+ case 15: enc.WriteMap((Dictionary<string, object>) val) ; break ; // Ftable
+ case 16: enc.WriteInt8((short) val) ; break; // int8
+ case 17: enc.WriteInt16((int) val) ; break; // int16
+ case 18: enc.WriteInt32(long.Parse(""+ val)) ; break; // int32
+ case 19: enc.WriteInt64(long.Parse("" + val)) ; break; // int64
+ case 20: // Object
+ // Check that the object has a session, if not
+ // take ownership of it
+ QMFObject qObj = (QMFObject) val ;
+ if (qObj.Session == null) {
+ qObj.Session = this ;
+ }
+ qObj.Encode(enc) ;
+ break;
+ case 21: // List
+ List<object> items = (List<object>) val ;
+ MSEncoder lEnc = new MSEncoder(1) ;
+ lEnc.Init() ;
+ lEnc.WriteUint32(items.Count) ;
+ foreach (object obj in items) {
+ short innerType = Util.QMFType(obj) ;
+ lEnc.WriteUint8(innerType) ;
+ this.EncodeValue(lEnc,innerType,obj) ;
+ }
+ enc.WriteVbin32(lEnc.Segment().ToArray()) ;
+ break ;
+ case 22: // Array
+ List<object> aItems = (List<object>) val ;
+ MSEncoder aEnc = new MSEncoder(1) ;
+ aEnc.Init() ;
+ long aCount = aItems.Count ;
+ aEnc.WriteUint32(aCount) ;
+ if (aCount > 0) {
+ Object anObj = aItems[0] ;
+ short innerType = Util.QMFType(anObj) ;
+ aEnc.WriteUint8(innerType) ;
+ foreach (object obj in aItems) {
+ this.EncodeValue(aEnc,innerType,obj) ;
+ }
+ }
+ enc.WriteVbin32(aEnc.Segment().ToArray()) ;
+ break ;
+ default:
+ throw new Exception(String.Format("Invalid Type Code: {0}", type)) ;
+ }
+ }
+ catch (System.InvalidCastException e) {
+ string msg = String.Format("Class cast exception for typecode {0}, type {1} ", type, val.GetType()) ;
+ log.Error(msg) ;
+ throw new Exception(msg + type, e) ;
+ }
+ }
+
+ public List<string> BindingKeys() {
+ List<string> bindings = new List<string>() ;
+ bindings.Add("schema.#") ;
+ if (RecieveObjects & RecieveEvents & RecieveHeartbeat & !UserBindings) {
+ bindings.Add("console.#") ;
+ }
+ else {
+ if (RecieveObjects & !UserBindings) {
+ bindings.Add("console.obj.#") ;
+ }
+ else {
+ bindings.Add("console.obj.*.*.org.apache.qpid.broker.agent") ;
+ }
+ if (RecieveEvents) {
+ bindings.Add("console.event.#") ;
+ }
+ if (RecieveHeartbeat) {
+ bindings.Add("console.heartbeat.#") ;
+ }
+ }
+ return bindings ;
+ }
+
+ protected QMFObject CreateQMFObject(SchemaClass schema, bool hasProperties, bool hasStats , bool isManaged) {
+ Type realClass = typeof(QMFObject) ;
+ if (Console != null) {
+ realClass = Console.TypeMapping(schema.Key) ;
+ }
+ Type[] types = new Type[] {typeof(Session), typeof(SchemaClass), typeof(bool), typeof(bool),typeof(bool)} ;
+ object[] args = new object[] {this, schema, hasProperties, hasStats, isManaged} ;
+ ConstructorInfo ci = realClass.GetConstructor(types);
+ return (QMFObject) ci.Invoke(args) ;
+ }
+
+ protected QMFObject CreateQMFObject(SchemaClass schema, IDecoder dec, bool hasProperties, bool hasStats , bool isManaged) {
+ Type realClass = typeof(QMFObject) ;
+ if (Console != null) {
+ realClass = Console.TypeMapping(schema.Key) ;
+ }
+ Type[] types = new Type[] {typeof(Session), typeof(SchemaClass), typeof(IDecoder), typeof(bool), typeof(bool),typeof(bool)} ;
+ object[] args = new object[] {this, schema, dec, hasProperties, hasStats, isManaged} ;
+ ConstructorInfo ci = realClass.GetConstructor(types);
+ return (QMFObject) ci.Invoke(args) ;
+ }
+ }
+}