summaryrefslogtreecommitdiff
path: root/trunk/qpid/python/models/fedsim/fedsim.py
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/python/models/fedsim/fedsim.py')
-rw-r--r--trunk/qpid/python/models/fedsim/fedsim.py434
1 files changed, 0 insertions, 434 deletions
diff --git a/trunk/qpid/python/models/fedsim/fedsim.py b/trunk/qpid/python/models/fedsim/fedsim.py
deleted file mode 100644
index edb6c4c8ed..0000000000
--- a/trunk/qpid/python/models/fedsim/fedsim.py
+++ /dev/null
@@ -1,434 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-class Sim:
- def __init__(self):
- self.brokers = {}
- self.clients = {}
- self.errors = 0
- self.warnings = 0
-
- def error(self, text):
- self.errors += 1
- print "###### Error:", text
-
- def warning(self, text):
- self.warnings += 1
- print "###### Warning:", text
-
- def end(self):
- print "========================"
- print "Errors: %d, Warnings: %d" % (self.errors, self.warnings)
- print "========================"
-
- def dumpState(self):
- print "============================"
- print "===== Federation State ====="
- print "============================"
- for broker in self.brokers:
- for exchange in self.brokers[broker].exchanges:
- print "Exchange %s.%s" % (broker, exchange)
- for key in self.brokers[broker].exchanges[exchange].keys:
- print " Key %s" % key
- for queue in self.brokers[broker].exchanges[exchange].keys[key]:
- print " Queue %s origins=%s" % \
- (queue.name, self.brokers[broker].exchanges[exchange].keys[key][queue].originList)
-
- def addBroker(self, name):
- if name in self.brokers:
- raise Exception("Broker of same name already exists")
- broker = Broker(self, name)
- self.brokers[name] = broker
- return broker
-
- def addClient(self, name, broker):
- if name in self.clients:
- raise Exception("Client of same name already exists")
- client = Client(self, name, broker)
- self.clients[name] = client
- return client
-
- def link(self, left, right, bidir=True):
- print "====== link %s to %s, bidir=%s" % (left.tag, right.tag, bidir)
- l1 = left.createLink(right)
- l1.bridge("amq.direct")
- if bidir:
- l2 = right.createLink(left)
- l2.bridge("amq.direct")
-
- def bind(self, client, key):
- print "====== bind Client(%s): k=%s" % (client.name, key)
- client.bind(key)
-
- def unbind(self, client, key):
- print "====== unbind Client(%s): k=%s" % (client.name, key)
- client.unbind(key)
-
- def sendMessage(self, key, broker, body="Message Body"):
- print "====== sendMessage: broker=%s k=%s" % (broker.tag, key)
- msg = Message(key, body)
- exchange = broker.exchanges["amq.direct"]
- for client in self.clients:
- self.clients[client].expect(key);
- exchange.receive(key, msg, True)
- for client in self.clients:
- self.clients[client].checkReception()
-
-
-class Destination:
- def receive(self, key, msg, fromUser=False):
- pass
-
-
-class Client(Destination):
- def __init__(self, sim, name, broker):
- self.sim = sim
- self.name = name
- self.broker = broker
- self.broker.connect(self)
- self.queue = self.broker.declare_queue(name)
- self.subscription = self.broker.subscribe(self, name)
- self.expected = None
- self.boundKeys = []
-
- def bind(self, key):
- self.boundKeys.append(key)
- self.broker.bind("amq.direct", self.name, key)
-
- def unbind(self, key):
- self.boundKeys.remove(key)
- self.broker.unbind("amq.direct", self.name, key)
-
- def receive(self, key, msg, fromUser=False):
- print "Client(%s) received [%s]: %s" % (self.name, key, msg.body)
- if self.expected == key:
- self.expected = None
- else:
- self.sim.error("Client(%s) received unexpected message with key [%s]" % \
- (self.name, self.expected))
-
- def expect(self, key):
- if key in self.boundKeys:
- self.expected = key
-
- def checkReception(self):
- if self.expected:
- self.sim.error("Client(%s) never received message with key [%s]" % \
- (self.name, self.expected))
-
-class Broker(Client):
- def __init__(self, sim, tag):
- self.sim = sim
- self.tag = tag
- self.connections = {}
- self.exchanges = {}
- self.queues = {}
- self.subscriptions = {}
- self.links = {}
- self.directExchange = self.declare_exchange("amq.direct")
-
- def connect(self, client):
- if client in self.connections:
- raise Exception("Client already connected")
- self.connections[client] = Connection(client)
-
- def declare_queue(self, name, tag=None, exclude=None):
- if name in self.queues:
- raise Exception("Queue already exists")
- self.queues[name] = Queue(self, name, tag, exclude)
-
- def subscribe(self, dest, queueName):
- if queueName not in self.queues:
- raise Exception("Queue does not exist")
- self.queues[queueName].setDest(dest)
-
- def declare_exchange(self, name):
- if name in self.exchanges:
- return
- exchange = Exchange(self, name)
- self.exchanges[name] = exchange
- return exchange
-
- def bind(self, exchangeName, queueName, key, tagList=[], fedOp=None, origin=None):
- if exchangeName not in self.exchanges:
- raise Exception("Exchange not found")
- if queueName not in self.queues:
- raise Exception("Queue not found")
- exchange = self.exchanges[exchangeName]
- queue = self.queues[queueName]
- exchange.bind(queue, key, tagList, fedOp, origin)
-
- def unbind(self, exchangeName, queueName, key):
- if exchangeName not in self.exchanges:
- raise Exception("Exchange not found")
- if queueName not in self.queues:
- raise Exception("Queue not found")
- exchange = self.exchanges[exchangeName]
- queue = self.queues[queueName]
- exchange.unbind(queue, key)
-
- def createLink(self, other):
- if other in self.links:
- raise Exception("Peer broker already linked")
- link = Link(self, other)
- self.links[other] = link
- return link
-
-
-class Connection:
- def __init__(self, client):
- self.client = client
-
-
-class Exchange(Destination):
- def __init__(self, broker, name):
- self.broker = broker
- self.sim = broker.sim
- self.name = name
- self.keys = {}
- self.bridges = []
-
- def bind(self, queue, key, tagList, fedOp, origin):
- if not fedOp: fedOp = "bind"
- print "Exchange(%s.%s) bind q=%s, k=%s, tags=%s, op=%s, origin=%s" % \
- (self.broker.tag, self.name, queue.name, key, tagList, fedOp, origin),
-
- if self.broker.tag in tagList:
- print "(tag ignored)"
- return
-
- if fedOp == "bind" or fedOp == "unbind":
- if key not in self.keys:
- self.keys[key] = {}
- queueMap = self.keys[key]
-
- if fedOp == "bind":
- ##
- ## Add local or federation binding case
- ##
- if queue in queueMap:
- if origin and origin in queueMap[queue].originList:
- print "(dup ignored)"
- elif origin:
- queueMap[queue].originList.append(origin)
- print "(origin added)"
- else:
- binding = Binding(origin)
- queueMap[queue] = binding
- print "(binding added)"
-
- elif fedOp == "unbind":
- ##
- ## Delete federation binding case
- ##
- if queue in queueMap:
- binding = queueMap[queue]
- if origin and origin in binding.originList:
- binding.originList.remove(origin)
- if len(binding.originList) == 0:
- queueMap.pop(queue)
- if len(queueMap) == 0:
- self.keys.pop(key)
- print "(last origin del)"
- else:
- print "(removed origin)"
- else:
- print "(origin not found)"
- else:
- print "(queue not found)"
-
- elif fedOp == "reorigin":
- print "(ok)"
- self.reorigin()
-
- elif fedOp == "hello":
- print "(ok)"
-
- else:
- raise Exception("Unknown fed-opcode '%s'" % fedOp)
-
- newTagList = []
- newTagList.append(self.broker.tag)
- for tag in tagList:
- newTagList.append(tag)
- if origin:
- propOrigin = origin
- else:
- propOrigin = self.broker.tag
-
- for bridge in self.bridges:
- if bridge.isDynamic():
- bridge.propagate(key, newTagList, fedOp, propOrigin)
-
- def reorigin(self):
- myTag = []
- myTag.append(self.broker.tag)
- for key in self.keys:
- queueMap = self.keys[key]
- found = False
- for queue in queueMap:
- binding = queueMap[queue]
- if binding.isLocal():
- found = True
- if found:
- for bridge in self.bridges:
- if bridge.isDynamic():
- bridge.propagate(key, myTag, "bind", self.broker.tag)
-
- def unbind(self, queue, key):
- print "Exchange(%s.%s) unbind q=%s, k=%s" % (self.broker.tag, self.name, queue.name, key),
- if key not in self.keys:
- print "(key not known)"
- return
- queueMap = self.keys[key]
- if queue not in queueMap:
- print "(queue not bound)"
- return
- queueMap.pop(queue)
- if len(queueMap) == 0:
- self.keys.pop(key)
- print "(ok, remove bound-key)"
- else:
- print "(ok)"
-
- count = 0
- for queue in queueMap:
- if len(queueMap[queue].originList) == 0:
- count += 1
-
- if count == 0:
- myTag = []
- myTag.append(self.broker.tag)
- for bridge in self.bridges:
- if bridge.isDynamic():
- bridge.propagate(key, myTag, "unbind", self.broker.tag)
-
- def receive(self, key, msg, fromUser=False):
- sent = False
- if key in self.keys:
- queueMap = self.keys[key]
- for queue in queueMap:
- if queue.enqueue(msg):
- sent = True
- if not sent and not fromUser:
- self.sim.warning("Exchange(%s.%s) received unroutable message: k=%s" % \
- (self.broker.tag, self.name, key))
-
- def addDynamicBridge(self, bridge):
- if bridge in self.bridges:
- raise Exception("Dynamic bridge already added to exchange")
- self.bridges.append(bridge)
-
- for b in self.bridges:
- if b != bridge:
- b.sendReorigin()
- self.reorigin()
-
-class Queue:
- def __init__(self, broker, name, tag=None, exclude=None):
- self.broker = broker
- self.name = name
- self.tag = tag
- self.exclude = exclude
- self.dest = None
-
- def setDest(self, dest):
- self.dest = dest
-
- def enqueue(self, msg):
- print "Queue(%s.%s) rcvd k=%s, tags=%s" % (self.broker.tag, self.name, msg.key, msg.tags),
- if self.dest == None:
- print "(dropped, no dest)"
- return False
- if self.exclude and msg.tagFound(self.exclude):
- print "(dropped, tag)"
- return False
- if self.tag:
- msg.appendTag(self.tag)
- print "(ok)"
- self.dest.receive(msg.key, msg)
- return True
-
-
-class Binding:
- def __init__(self, origin):
- self.originList = []
- if origin:
- self.originList.append(origin)
-
- def isLocal(self):
- return len(self.originList) == 0
-
-
-class Link:
- def __init__(self, local, remote):
- self.local = local
- self.remote = remote
- self.remote.connect(self)
- self.bridges = []
-
- def bridge(self, exchangeName):
- bridge = Bridge(self, exchangeName)
-
-
-class Bridge:
- def __init__(self, link, exchangeName):
- self.link = link
- self.exchangeName = exchangeName
- if self.exchangeName not in link.local.exchanges:
- raise Exception("Exchange not found")
- self.exchange = link.local.exchanges[self.exchangeName]
- self.queueName = "bridge." + link.local.tag
- self.link.remote.declare_queue(self.queueName, self.link.remote.tag, self.link.local.tag)
- self.link.remote.subscribe(self.exchange, self.queueName)
- self.exchange.addDynamicBridge(self)
-
- def isDynamic(self):
- return True
-
- def localTag(self):
- return self.link.local.tag
-
- def remoteTag(self):
- return self.link.remote.tag
-
- def propagate(self, key, tagList, fedOp, origin):
- if self.link.remote.tag not in tagList:
- self.link.remote.bind(self.exchangeName, self.queueName, key, tagList, fedOp, origin)
-
- def sendReorigin(self):
- myTag = []
- myTag.append(self.link.local.tag)
- self.link.remote.bind(self.exchangeName, self.queueName, "", myTag, "reorigin", "")
-
-
-class Message:
- def __init__(self, key, body):
- self.key = key
- self.body = body
- self.tags = []
-
- def appendTag(self, tag):
- if tag not in self.tags:
- self.tags.append(tag)
-
- def tagFound(self, tag):
- return tag in self.tags
-
-