/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections.Generic ; using System.Threading ; using org.apache.qpid.client ; using org.apache.qpid.transport ; using org.apache.qpid.transport.codec ; using log4net ; namespace org.apache.qpid.console { /** * Controls all communication with a broker. Works with the session to provide * synhchronous method calls across the asynchronous QMF bus. */ public class Broker : IMessageListener { public static ILog log = LogManager.GetLogger(typeof(Broker)) ; public static int SYNC_TIME = 60000 ; public BrokerURL url ; public Dictionary Agents = new Dictionary() ; private IClient client ; private IClientSession clientSession ; //FIXME This second session should not be needed. There is a bug in the underlieing code. private IClientSession outSession ; private int timeout = 50000 ; private string replyName ; private string topicName ; private bool connected = false ; private bool syncInFlight = false ; private bool topicBound = false ; private int reqsOutstanding = 0 ; private org.apache.qpid.console.Session consoleSession ; private object lockObject = new Object() ; public Broker(org.apache.qpid.console.Session session, BrokerURL url) { log.Debug("Creating a new Broker for url " + url) ; this.url = url; consoleSession = session ; this.TryToConnect() ; } ~Broker() { if (connected) { this.Shutdown() ; } } public int BrokerBank() { return 1 ; } public bool IsConnected() { return connected ; } protected void TryToConnect() { reqsOutstanding = 1 ; Agent newAgent = new Agent(this,0,"BrokerAgent") ; Agents.Add(newAgent.AgentKey(), newAgent) ; client = new Client() ; client.Connect(url.Hostname, url.Port, null, url.AuthName, url.AuthPassword) ; clientSession = client.CreateSession(timeout) ; //clientSession.SetAutoSync(false) ; string name = System.Text.Encoding.UTF8.GetString(clientSession.GetName()) ; replyName = "reply-" + name ; topicName = "topic-" + name ; clientSession.SetAutoSync(true) ; Option[] options = new Option[] {Option.EXCLUSIVE, Option.AUTO_DELETE} ; // This queue is used for responses to messages which are sent. clientSession.QueueDeclare(replyName,options) ; clientSession.ExchangeBind(replyName,"amq.direct",replyName) ; clientSession.AttachMessageListener(this, "rdest") ; clientSession.MessageSubscribe(replyName,"rdest",MessageAcceptMode.NONE,MessageAcquireMode.PRE_ACQUIRED,null,0,null) ; clientSession.MessageSetFlowMode("rdest", MessageFlowMode.WINDOW); clientSession.MessageFlow("rdest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES); clientSession.MessageFlow("rdest", MessageCreditUnit.MESSAGE, ClientSession.MESSAGE_FLOW_MAX_BYTES); // This queue is used for unsolicited messages sent to this class. clientSession.QueueDeclare(topicName, options) ; clientSession.AttachMessageListener(this, "tdest") ; clientSession.MessageSubscribe(topicName,"tdest",MessageAcceptMode.NONE,MessageAcquireMode.PRE_ACQUIRED,null,0,null) ; clientSession.MessageSetFlowMode("tdest", MessageFlowMode.WINDOW); clientSession.MessageFlow("tdest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES); clientSession.MessageFlow("tdest", MessageCreditUnit.MESSAGE, ClientSession.MESSAGE_FLOW_MAX_BYTES); outSession = client.CreateSession(timeout) ; outSession.ExchangeBind(replyName,"amq.direct",replyName) ; connected = true ; consoleSession.HandleBrokerConnect(this) ; IEncoder encoder = CreateEncoder() ; this.SetHeader(encoder, 'B', 0) ; this.Send(encoder) ; } public void Shutdown() { if (connected) { this.WaitForStable() ; clientSession.MessageStop("rdest") ; clientSession.MessageStop("tdest") ; clientSession.Close() ; client.Close() ; this.connected = false ; } } public void UpdateAgent(QMFObject obj) { long agentBank = (long)obj.GetProperty("agentBank") ; long brokerBank = (long)obj.GetProperty("brokerBank") ; String key = Agent.AgentKey(agentBank, brokerBank) ; if (obj.IsDeleted()) { if (Agents.ContainsKey(key)) { Agent agent = Agents[key] ; Agents.Remove(key) ; consoleSession.HandleAgentRemoved(agent) ; } } else { if (! Agents.ContainsKey(key)) { Agent newAgent = new Agent(this, agentBank, (string)obj.GetProperty("label")) ; Agents.Add(key, newAgent) ; consoleSession.HandleNewAgent(newAgent) ; } } } public IEncoder CreateEncoder() { return new MSEncoder(1000) ; } public IEncoder CreateEncoder(char opcode, long sequence) { return SetHeader(this.CreateEncoder(), opcode, sequence) ; } public IEncoder SetHeader(IEncoder enc, char opcode, long sequence) { enc.WriteUint8((short)'A') ; enc.WriteUint8((short)'M') ; enc.WriteUint8((short)'2') ; enc.WriteUint8((short)opcode) ; enc.WriteUint32(sequence) ; return enc ; } public Message CreateMessage(IEncoder enc) { return this.CreateMessage(enc, "broker", -1) ; } public Message CreateMessage(IEncoder enc, string routingKey) { return this.CreateMessage(enc, routingKey, -1) ; } public Message CreateMessage(IEncoder enc, string routingKey, long ttl) { Message msg = new Message() ; msg.Body = ((MSEncoder)enc).Segment() ; msg.DeliveryProperties.SetRoutingKey(routingKey) ; if (-1 != ttl) { msg.DeliveryProperties.SetTtl(ttl) ; } msg.MessageProperties.SetContentType("x-application/qmf") ; msg.MessageProperties.SetReplyTo(new ReplyTo("amq.direct", replyName)) ; return msg ; } public void Send(IEncoder enc) { this.Send(this.CreateMessage(enc)) ; } public void Send(Message msg) { lock (lockObject) { log.Debug(String.Format("Sending message to routing key '{0}'", msg.DeliveryProperties.GetRoutingKey())) ; //log.Debug(System.Text.Encoding.UTF8.GetString(msg.Body.ToArray())) ; outSession.MessageTransfer("qpid.management", msg) ; //clientSession.sync() ; } } protected bool CheckHeader(IDecoder decoder, out char opcode, out long sequence) { bool returnValue = false ; opcode = 'x' ; sequence = -1 ; if(decoder.HasRemaining()) { char character = (char) decoder.ReadUint8() ; if (character != 'A') { return returnValue ; } character = (char) decoder.ReadUint8() ; if (character != 'M') { return returnValue ; } character = (char) decoder.ReadUint8() ; if (character != '2') { return returnValue ; } returnValue = true ; opcode = (char) decoder.ReadUint8() ; sequence = decoder.ReadUint32() ; } return returnValue ; } public void MessageTransfer(IMessage msg) { MSDecoder decoder = new MSDecoder() ; decoder.Init(msg.Body) ; RangeSet rangeSet = new RangeSet() ; rangeSet.Add(msg.Id) ; char opcode = 'x' ; long seq = -1 ; while (this.CheckHeader(decoder, out opcode, out seq)) { //log.Debug("Message recieved with opcode " + opcode + " and sequence " + seq) ; //log.Debug(System.Text.Encoding.UTF8.GetString(msg.Body.ToArray())) ; switch (opcode) { case 'b': consoleSession.HandleBrokerResponse(this, decoder, seq) ; break ; case 'p': consoleSession.HandlePackageIndicator(this, decoder, seq) ; break ; case 'z': consoleSession.HandleCommandComplete(this, decoder, seq) ; break ; case 'q': consoleSession.HandleClassIndicator(this, decoder, seq) ; break ; case 'm': consoleSession.HandleMethodResponse(this, decoder, seq) ; break ; case 'h': consoleSession.HandleHeartbeatIndicator(this, decoder, seq, msg) ; break ; case 'e': consoleSession.HandleEventIndicator(this, decoder, seq) ; break ; case 's': consoleSession.HandleSchemaResponse(this, decoder, seq) ; break ; case 'c': consoleSession.HandleContentIndicator(this, decoder, seq, true, false) ; break ; case 'i': consoleSession.HandleContentIndicator(this, decoder, seq, false, true) ; break ; case 'g': consoleSession.HandleContentIndicator(this, decoder, seq, true, true) ; break ; default: log.Error("Invalid message type recieved with opcode " + opcode) ; break ; } } lock (lockObject) { outSession.MessageAccept(rangeSet) ; } } public void IncrementOutstanding() { lock (lockObject) { this.reqsOutstanding += 1 ; } } public void DecrementOutstanding() { lock (lockObject) { this.reqsOutstanding -= 1 ; if ((reqsOutstanding == 0) & !topicBound) { foreach (string key in consoleSession.BindingKeys()) { //this.clientSession.ExchangeBind(topicName, "qpid.mannagement", key) ; log.Debug("Setting Topic Binding " + key) ; this.outSession.ExchangeBind(topicName, "qpid.management", key) ; } topicBound = true ; } if ((reqsOutstanding == 0) & syncInFlight) { syncInFlight = false ; Monitor.PulseAll(lockObject) ; } } } public void WaitForStable() { lock (lockObject) { if (connected) { DateTime start = DateTime.Now ; syncInFlight = true ; while (reqsOutstanding != 0) { log.Debug("Waiting to recieve messages") ; Monitor.Wait(lockObject,SYNC_TIME) ; TimeSpan duration = DateTime.Now - start; if (duration.TotalMilliseconds > SYNC_TIME) { throw new Exception("Timeout waiting for Broker to Sync") ; } } } } } public void SetSyncInFlight(bool inFlight) { lock(lockObject) { syncInFlight = inFlight ; Monitor.PulseAll(lockObject) ; } } public bool GetSyncInFlight() { return syncInFlight ; } public void WaitForSync(int timeout) { lock(lockObject) { DateTime start = DateTime.Now ; while (syncInFlight) { Monitor.Wait(lockObject,timeout) ; } TimeSpan duration = DateTime.Now - start; if (duration.TotalMilliseconds > timeout) { throw new Exception("Timeout waiting for Broker to Sync") ; } } } } }