summaryrefslogtreecommitdiff
path: root/qpid/dotnet/client-010/management/console/Broker.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/client-010/management/console/Broker.cs')
-rw-r--r--qpid/dotnet/client-010/management/console/Broker.cs351
1 files changed, 351 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/management/console/Broker.cs b/qpid/dotnet/client-010/management/console/Broker.cs
new file mode 100644
index 0000000000..7684da9e12
--- /dev/null
+++ b/qpid/dotnet/client-010/management/console/Broker.cs
@@ -0,0 +1,351 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections.Generic ;
+using System.Threading ;
+using org.apache.qpid.client ;
+using org.apache.qpid.transport ;
+using org.apache.qpid.transport.codec ;
+using log4net ;
+
+namespace org.apache.qpid.console
+{
+
+ /**
+ * Controls all communication with a broker. Works with the session to provide
+ * synhchronous method calls across the asynchronous QMF bus.
+ */
+ public class Broker : IMessageListener
+ {
+ public static ILog log = LogManager.GetLogger(typeof(Broker)) ;
+ public static int SYNC_TIME = 60000 ;
+
+ public BrokerURL url ;
+ public Dictionary<string, Agent> Agents = new Dictionary<string, Agent>() ;
+
+ private IClient client ;
+ private IClientSession clientSession ;
+ //FIXME This second session should not be needed. There is a bug in the underlieing code.
+ private IClientSession outSession ;
+ private int timeout = 50000 ;
+ private string replyName ;
+ private string topicName ;
+ private bool connected = false ;
+ private bool syncInFlight = false ;
+ private bool topicBound = false ;
+ private int reqsOutstanding = 0 ;
+ private org.apache.qpid.console.Session consoleSession ;
+ private object lockObject = new Object() ;
+
+
+ public Broker(org.apache.qpid.console.Session session, BrokerURL url)
+ {
+ log.Debug("Creating a new Broker for url " + url) ;
+ this.url = url;
+ consoleSession = session ;
+ this.TryToConnect() ;
+ }
+
+ ~Broker() {
+ if (connected) {
+ this.Shutdown() ;
+ }
+ }
+
+ public int BrokerBank() {
+ return 1 ;
+ }
+
+ public bool IsConnected() {
+ return connected ;
+ }
+
+ protected void TryToConnect() {
+ reqsOutstanding = 1 ;
+ Agent newAgent = new Agent(this,0,"BrokerAgent") ;
+ Agents.Add(newAgent.AgentKey(), newAgent) ;
+ client = new Client() ;
+ client.Connect(url.Hostname, url.Port, null, url.AuthName, url.AuthPassword) ;
+ clientSession = client.CreateSession(timeout) ;
+ //clientSession.SetAutoSync(false) ;
+ string name = System.Text.Encoding.UTF8.GetString(clientSession.GetName()) ;
+ replyName = "reply-" + name ;
+ topicName = "topic-" + name ;
+ clientSession.SetAutoSync(true) ;
+ Option[] options = new Option[] {Option.EXCLUSIVE, Option.AUTO_DELETE} ;
+
+ // This queue is used for responses to messages which are sent.
+ clientSession.QueueDeclare(replyName,options) ;
+ clientSession.ExchangeBind(replyName,"amq.direct",replyName) ;
+ clientSession.AttachMessageListener(this, "rdest") ;
+ clientSession.MessageSubscribe(replyName,"rdest",MessageAcceptMode.NONE,MessageAcquireMode.PRE_ACQUIRED,null,0,null) ;
+ clientSession.MessageSetFlowMode("rdest", MessageFlowMode.WINDOW);
+ clientSession.MessageFlow("rdest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
+ clientSession.MessageFlow("rdest", MessageCreditUnit.MESSAGE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
+
+ // This queue is used for unsolicited messages sent to this class.
+ clientSession.QueueDeclare(topicName, options) ;
+ clientSession.AttachMessageListener(this, "tdest") ;
+ clientSession.MessageSubscribe(topicName,"tdest",MessageAcceptMode.NONE,MessageAcquireMode.PRE_ACQUIRED,null,0,null) ;
+ clientSession.MessageSetFlowMode("tdest", MessageFlowMode.WINDOW);
+ clientSession.MessageFlow("tdest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
+ clientSession.MessageFlow("tdest", MessageCreditUnit.MESSAGE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
+
+ outSession = client.CreateSession(timeout) ;
+ outSession.ExchangeBind(replyName,"amq.direct",replyName) ;
+
+ connected = true ;
+ consoleSession.HandleBrokerConnect(this) ;
+
+
+ IEncoder encoder = CreateEncoder() ;
+ this.SetHeader(encoder, 'B', 0) ;
+ this.Send(encoder) ;
+ }
+
+ public void Shutdown() {
+ if (connected) {
+ this.WaitForStable() ;
+ clientSession.MessageStop("rdest") ;
+ clientSession.MessageStop("tdest") ;
+ clientSession.Close() ;
+ client.Close() ;
+ this.connected = false ;
+ }
+ }
+
+ public void UpdateAgent(QMFObject obj) {
+ long agentBank = (long)obj.GetProperty("agentBank") ;
+ long brokerBank = (long)obj.GetProperty("brokerBank") ;
+ String key = Agent.AgentKey(agentBank, brokerBank) ;
+ if (obj.IsDeleted()) {
+ if (Agents.ContainsKey(key)) {
+ Agent agent = Agents[key] ;
+ Agents.Remove(key) ;
+ consoleSession.HandleAgentRemoved(agent) ;
+ }
+ }
+ else {
+ if (! Agents.ContainsKey(key)) {
+ Agent newAgent = new Agent(this, agentBank, (string)obj.GetProperty("label")) ;
+ Agents.Add(key, newAgent) ;
+ consoleSession.HandleNewAgent(newAgent) ;
+ }
+ }
+ }
+
+ public IEncoder CreateEncoder() {
+ return new MSEncoder(1000) ;
+ }
+
+
+ public IEncoder CreateEncoder(char opcode, long sequence) {
+ return SetHeader(this.CreateEncoder(), opcode, sequence) ;
+ }
+
+ public IEncoder SetHeader(IEncoder enc, char opcode, long sequence) {
+ enc.WriteUint8((short)'A') ;
+ enc.WriteUint8((short)'M') ;
+ enc.WriteUint8((short)'2') ;
+ enc.WriteUint8((short)opcode) ;
+ enc.WriteUint32(sequence) ;
+ return enc ;
+ }
+
+ public Message CreateMessage(IEncoder enc) {
+ return this.CreateMessage(enc, "broker", -1) ;
+ }
+
+ public Message CreateMessage(IEncoder enc, string routingKey) {
+ return this.CreateMessage(enc, routingKey, -1) ;
+ }
+
+ public Message CreateMessage(IEncoder enc, string routingKey, long ttl) {
+ Message msg = new Message() ;
+ msg.Body = ((MSEncoder)enc).Segment() ;
+ msg.DeliveryProperties.SetRoutingKey(routingKey) ;
+ if (-1 != ttl) {
+ msg.DeliveryProperties.SetTtl(ttl) ;
+ }
+ msg.MessageProperties.SetContentType("x-application/qmf") ;
+ msg.MessageProperties.SetReplyTo(new ReplyTo("amq.direct", replyName)) ;
+ return msg ;
+ }
+
+ public void Send(IEncoder enc) {
+ this.Send(this.CreateMessage(enc)) ;
+ }
+
+ public void Send(Message msg) {
+
+ lock (lockObject) {
+ log.Debug(String.Format("Sending message to routing key '{0}'", msg.DeliveryProperties.GetRoutingKey())) ;
+ //log.Debug(System.Text.Encoding.UTF8.GetString(msg.Body.ToArray())) ;
+ outSession.MessageTransfer("qpid.management", msg) ;
+ //clientSession.sync() ;
+ }
+ }
+
+ protected bool CheckHeader(IDecoder decoder, out char opcode, out long sequence) {
+ bool returnValue = false ;
+ opcode = 'x' ;
+ sequence = -1 ;
+ if(decoder.HasRemaining()) {
+ char character = (char) decoder.ReadUint8() ;
+ if (character != 'A') {
+ return returnValue ;
+ }
+ character = (char) decoder.ReadUint8() ;
+ if (character != 'M') {
+ return returnValue ;
+ }
+ character = (char) decoder.ReadUint8() ;
+ if (character != '2') {
+ return returnValue ;
+ }
+ returnValue = true ;
+ opcode = (char) decoder.ReadUint8() ;
+ sequence = decoder.ReadUint32() ;
+ }
+ return returnValue ;
+ }
+
+ public void MessageTransfer(IMessage msg) {
+ MSDecoder decoder = new MSDecoder() ;
+ decoder.Init(msg.Body) ;
+ RangeSet rangeSet = new RangeSet() ;
+ rangeSet.Add(msg.Id) ;
+ char opcode = 'x' ;
+ long seq = -1 ;
+ while (this.CheckHeader(decoder, out opcode, out seq)) {
+ //log.Debug("Message recieved with opcode " + opcode + " and sequence " + seq) ;
+ //log.Debug(System.Text.Encoding.UTF8.GetString(msg.Body.ToArray())) ;
+ switch (opcode) {
+ case 'b':
+ consoleSession.HandleBrokerResponse(this, decoder, seq) ;
+ break ;
+ case 'p':
+ consoleSession.HandlePackageIndicator(this, decoder, seq) ;
+ break ;
+ case 'z':
+ consoleSession.HandleCommandComplete(this, decoder, seq) ;
+ break ;
+ case 'q':
+ consoleSession.HandleClassIndicator(this, decoder, seq) ;
+ break ;
+ case 'm':
+ consoleSession.HandleMethodResponse(this, decoder, seq) ;
+ break ;
+ case 'h':
+ consoleSession.HandleHeartbeatIndicator(this, decoder, seq, msg) ;
+ break ;
+ case 'e':
+ consoleSession.HandleEventIndicator(this, decoder, seq) ;
+ break ;
+ case 's':
+ consoleSession.HandleSchemaResponse(this, decoder, seq) ;
+ break ;
+ case 'c':
+ consoleSession.HandleContentIndicator(this, decoder, seq, true, false) ;
+ break ;
+ case 'i':
+ consoleSession.HandleContentIndicator(this, decoder, seq, false, true) ;
+ break ;
+ case 'g':
+ consoleSession.HandleContentIndicator(this, decoder, seq, true, true) ;
+ break ;
+ default:
+ log.Error("Invalid message type recieved with opcode " + opcode) ;
+ break ;
+ }
+ }
+ lock (lockObject) {
+ outSession.MessageAccept(rangeSet) ;
+ }
+ }
+
+ public void IncrementOutstanding() {
+ lock (lockObject) {
+ this.reqsOutstanding += 1 ;
+ }
+ }
+
+ public void DecrementOutstanding() {
+ lock (lockObject) {
+ this.reqsOutstanding -= 1 ;
+ if ((reqsOutstanding == 0) & !topicBound) {
+ foreach (string key in consoleSession.BindingKeys()) {
+ //this.clientSession.ExchangeBind(topicName, "qpid.mannagement", key) ;
+ log.Debug("Setting Topic Binding " + key) ;
+ this.outSession.ExchangeBind(topicName, "qpid.management", key) ;
+ }
+ topicBound = true ;
+ }
+ if ((reqsOutstanding == 0) & syncInFlight) {
+ syncInFlight = false ;
+ Monitor.PulseAll(lockObject) ;
+ }
+ }
+ }
+
+ public void WaitForStable() {
+ lock (lockObject) {
+ if (connected) {
+ DateTime start = DateTime.Now ;
+ syncInFlight = true ;
+ while (reqsOutstanding != 0) {
+ log.Debug("Waiting to recieve messages") ;
+ Monitor.Wait(lockObject,SYNC_TIME) ;
+ TimeSpan duration = DateTime.Now - start;
+ if (duration.TotalMilliseconds > SYNC_TIME) {
+ throw new Exception("Timeout waiting for Broker to Sync") ;
+ }
+ }
+ }
+ }
+ }
+
+ public void SetSyncInFlight(bool inFlight) {
+ lock(lockObject) {
+ syncInFlight = inFlight ;
+ Monitor.PulseAll(lockObject) ;
+ }
+ }
+
+ public bool GetSyncInFlight() {
+ return syncInFlight ;
+ }
+
+ public void WaitForSync(int timeout) {
+ lock(lockObject) {
+ DateTime start = DateTime.Now ;
+ while (syncInFlight) {
+ Monitor.Wait(lockObject,timeout) ;
+ }
+ TimeSpan duration = DateTime.Now - start;
+ if (duration.TotalMilliseconds > timeout) {
+ throw new Exception("Timeout waiting for Broker to Sync") ;
+ }
+ }
+ }
+ }
+}