diff options
Diffstat (limited to 'trunk/qpid/python/models/fedsim/fedsim.py')
-rw-r--r-- | trunk/qpid/python/models/fedsim/fedsim.py | 434 |
1 files changed, 0 insertions, 434 deletions
diff --git a/trunk/qpid/python/models/fedsim/fedsim.py b/trunk/qpid/python/models/fedsim/fedsim.py deleted file mode 100644 index edb6c4c8ed..0000000000 --- a/trunk/qpid/python/models/fedsim/fedsim.py +++ /dev/null @@ -1,434 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -class Sim: - def __init__(self): - self.brokers = {} - self.clients = {} - self.errors = 0 - self.warnings = 0 - - def error(self, text): - self.errors += 1 - print "###### Error:", text - - def warning(self, text): - self.warnings += 1 - print "###### Warning:", text - - def end(self): - print "========================" - print "Errors: %d, Warnings: %d" % (self.errors, self.warnings) - print "========================" - - def dumpState(self): - print "============================" - print "===== Federation State =====" - print "============================" - for broker in self.brokers: - for exchange in self.brokers[broker].exchanges: - print "Exchange %s.%s" % (broker, exchange) - for key in self.brokers[broker].exchanges[exchange].keys: - print " Key %s" % key - for queue in self.brokers[broker].exchanges[exchange].keys[key]: - print " Queue %s origins=%s" % \ - (queue.name, self.brokers[broker].exchanges[exchange].keys[key][queue].originList) - - def addBroker(self, name): - if name in self.brokers: - raise Exception("Broker of same name already exists") - broker = Broker(self, name) - self.brokers[name] = broker - return broker - - def addClient(self, name, broker): - if name in self.clients: - raise Exception("Client of same name already exists") - client = Client(self, name, broker) - self.clients[name] = client - return client - - def link(self, left, right, bidir=True): - print "====== link %s to %s, bidir=%s" % (left.tag, right.tag, bidir) - l1 = left.createLink(right) - l1.bridge("amq.direct") - if bidir: - l2 = right.createLink(left) - l2.bridge("amq.direct") - - def bind(self, client, key): - print "====== bind Client(%s): k=%s" % (client.name, key) - client.bind(key) - - def unbind(self, client, key): - print "====== unbind Client(%s): k=%s" % (client.name, key) - client.unbind(key) - - def sendMessage(self, key, broker, body="Message Body"): - print "====== sendMessage: broker=%s k=%s" % (broker.tag, key) - msg = Message(key, body) - exchange = broker.exchanges["amq.direct"] - for client in self.clients: - self.clients[client].expect(key); - exchange.receive(key, msg, True) - for client in self.clients: - self.clients[client].checkReception() - - -class Destination: - def receive(self, key, msg, fromUser=False): - pass - - -class Client(Destination): - def __init__(self, sim, name, broker): - self.sim = sim - self.name = name - self.broker = broker - self.broker.connect(self) - self.queue = self.broker.declare_queue(name) - self.subscription = self.broker.subscribe(self, name) - self.expected = None - self.boundKeys = [] - - def bind(self, key): - self.boundKeys.append(key) - self.broker.bind("amq.direct", self.name, key) - - def unbind(self, key): - self.boundKeys.remove(key) - self.broker.unbind("amq.direct", self.name, key) - - def receive(self, key, msg, fromUser=False): - print "Client(%s) received [%s]: %s" % (self.name, key, msg.body) - if self.expected == key: - self.expected = None - else: - self.sim.error("Client(%s) received unexpected message with key [%s]" % \ - (self.name, self.expected)) - - def expect(self, key): - if key in self.boundKeys: - self.expected = key - - def checkReception(self): - if self.expected: - self.sim.error("Client(%s) never received message with key [%s]" % \ - (self.name, self.expected)) - -class Broker(Client): - def __init__(self, sim, tag): - self.sim = sim - self.tag = tag - self.connections = {} - self.exchanges = {} - self.queues = {} - self.subscriptions = {} - self.links = {} - self.directExchange = self.declare_exchange("amq.direct") - - def connect(self, client): - if client in self.connections: - raise Exception("Client already connected") - self.connections[client] = Connection(client) - - def declare_queue(self, name, tag=None, exclude=None): - if name in self.queues: - raise Exception("Queue already exists") - self.queues[name] = Queue(self, name, tag, exclude) - - def subscribe(self, dest, queueName): - if queueName not in self.queues: - raise Exception("Queue does not exist") - self.queues[queueName].setDest(dest) - - def declare_exchange(self, name): - if name in self.exchanges: - return - exchange = Exchange(self, name) - self.exchanges[name] = exchange - return exchange - - def bind(self, exchangeName, queueName, key, tagList=[], fedOp=None, origin=None): - if exchangeName not in self.exchanges: - raise Exception("Exchange not found") - if queueName not in self.queues: - raise Exception("Queue not found") - exchange = self.exchanges[exchangeName] - queue = self.queues[queueName] - exchange.bind(queue, key, tagList, fedOp, origin) - - def unbind(self, exchangeName, queueName, key): - if exchangeName not in self.exchanges: - raise Exception("Exchange not found") - if queueName not in self.queues: - raise Exception("Queue not found") - exchange = self.exchanges[exchangeName] - queue = self.queues[queueName] - exchange.unbind(queue, key) - - def createLink(self, other): - if other in self.links: - raise Exception("Peer broker already linked") - link = Link(self, other) - self.links[other] = link - return link - - -class Connection: - def __init__(self, client): - self.client = client - - -class Exchange(Destination): - def __init__(self, broker, name): - self.broker = broker - self.sim = broker.sim - self.name = name - self.keys = {} - self.bridges = [] - - def bind(self, queue, key, tagList, fedOp, origin): - if not fedOp: fedOp = "bind" - print "Exchange(%s.%s) bind q=%s, k=%s, tags=%s, op=%s, origin=%s" % \ - (self.broker.tag, self.name, queue.name, key, tagList, fedOp, origin), - - if self.broker.tag in tagList: - print "(tag ignored)" - return - - if fedOp == "bind" or fedOp == "unbind": - if key not in self.keys: - self.keys[key] = {} - queueMap = self.keys[key] - - if fedOp == "bind": - ## - ## Add local or federation binding case - ## - if queue in queueMap: - if origin and origin in queueMap[queue].originList: - print "(dup ignored)" - elif origin: - queueMap[queue].originList.append(origin) - print "(origin added)" - else: - binding = Binding(origin) - queueMap[queue] = binding - print "(binding added)" - - elif fedOp == "unbind": - ## - ## Delete federation binding case - ## - if queue in queueMap: - binding = queueMap[queue] - if origin and origin in binding.originList: - binding.originList.remove(origin) - if len(binding.originList) == 0: - queueMap.pop(queue) - if len(queueMap) == 0: - self.keys.pop(key) - print "(last origin del)" - else: - print "(removed origin)" - else: - print "(origin not found)" - else: - print "(queue not found)" - - elif fedOp == "reorigin": - print "(ok)" - self.reorigin() - - elif fedOp == "hello": - print "(ok)" - - else: - raise Exception("Unknown fed-opcode '%s'" % fedOp) - - newTagList = [] - newTagList.append(self.broker.tag) - for tag in tagList: - newTagList.append(tag) - if origin: - propOrigin = origin - else: - propOrigin = self.broker.tag - - for bridge in self.bridges: - if bridge.isDynamic(): - bridge.propagate(key, newTagList, fedOp, propOrigin) - - def reorigin(self): - myTag = [] - myTag.append(self.broker.tag) - for key in self.keys: - queueMap = self.keys[key] - found = False - for queue in queueMap: - binding = queueMap[queue] - if binding.isLocal(): - found = True - if found: - for bridge in self.bridges: - if bridge.isDynamic(): - bridge.propagate(key, myTag, "bind", self.broker.tag) - - def unbind(self, queue, key): - print "Exchange(%s.%s) unbind q=%s, k=%s" % (self.broker.tag, self.name, queue.name, key), - if key not in self.keys: - print "(key not known)" - return - queueMap = self.keys[key] - if queue not in queueMap: - print "(queue not bound)" - return - queueMap.pop(queue) - if len(queueMap) == 0: - self.keys.pop(key) - print "(ok, remove bound-key)" - else: - print "(ok)" - - count = 0 - for queue in queueMap: - if len(queueMap[queue].originList) == 0: - count += 1 - - if count == 0: - myTag = [] - myTag.append(self.broker.tag) - for bridge in self.bridges: - if bridge.isDynamic(): - bridge.propagate(key, myTag, "unbind", self.broker.tag) - - def receive(self, key, msg, fromUser=False): - sent = False - if key in self.keys: - queueMap = self.keys[key] - for queue in queueMap: - if queue.enqueue(msg): - sent = True - if not sent and not fromUser: - self.sim.warning("Exchange(%s.%s) received unroutable message: k=%s" % \ - (self.broker.tag, self.name, key)) - - def addDynamicBridge(self, bridge): - if bridge in self.bridges: - raise Exception("Dynamic bridge already added to exchange") - self.bridges.append(bridge) - - for b in self.bridges: - if b != bridge: - b.sendReorigin() - self.reorigin() - -class Queue: - def __init__(self, broker, name, tag=None, exclude=None): - self.broker = broker - self.name = name - self.tag = tag - self.exclude = exclude - self.dest = None - - def setDest(self, dest): - self.dest = dest - - def enqueue(self, msg): - print "Queue(%s.%s) rcvd k=%s, tags=%s" % (self.broker.tag, self.name, msg.key, msg.tags), - if self.dest == None: - print "(dropped, no dest)" - return False - if self.exclude and msg.tagFound(self.exclude): - print "(dropped, tag)" - return False - if self.tag: - msg.appendTag(self.tag) - print "(ok)" - self.dest.receive(msg.key, msg) - return True - - -class Binding: - def __init__(self, origin): - self.originList = [] - if origin: - self.originList.append(origin) - - def isLocal(self): - return len(self.originList) == 0 - - -class Link: - def __init__(self, local, remote): - self.local = local - self.remote = remote - self.remote.connect(self) - self.bridges = [] - - def bridge(self, exchangeName): - bridge = Bridge(self, exchangeName) - - -class Bridge: - def __init__(self, link, exchangeName): - self.link = link - self.exchangeName = exchangeName - if self.exchangeName not in link.local.exchanges: - raise Exception("Exchange not found") - self.exchange = link.local.exchanges[self.exchangeName] - self.queueName = "bridge." + link.local.tag - self.link.remote.declare_queue(self.queueName, self.link.remote.tag, self.link.local.tag) - self.link.remote.subscribe(self.exchange, self.queueName) - self.exchange.addDynamicBridge(self) - - def isDynamic(self): - return True - - def localTag(self): - return self.link.local.tag - - def remoteTag(self): - return self.link.remote.tag - - def propagate(self, key, tagList, fedOp, origin): - if self.link.remote.tag not in tagList: - self.link.remote.bind(self.exchangeName, self.queueName, key, tagList, fedOp, origin) - - def sendReorigin(self): - myTag = [] - myTag.append(self.link.local.tag) - self.link.remote.bind(self.exchangeName, self.queueName, "", myTag, "reorigin", "") - - -class Message: - def __init__(self, key, body): - self.key = key - self.body = body - self.tags = [] - - def appendTag(self, tag): - if tag not in self.tags: - self.tags.append(tag) - - def tagFound(self, tag): - return tag in self.tags - - |