diff options
author | Robert Gemmell <robbie@apache.org> | 2015-06-25 10:22:51 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2015-06-25 10:22:51 +0000 |
commit | 32ae758bc2e8fd962b66a4ab6341b14009f1907e (patch) | |
tree | 2f4d8174813284a6ea58bb6b7f6520aa92287476 /qpid/sandbox/models/fedsim/fedsim.py | |
parent | 116d91ad7825a98af36a869fc751206fbce0c59f (diff) | |
parent | f7e896076143de4572b4f1f67ef0765125f2498d (diff) | |
download | qpid-python-32ae758bc2e8fd962b66a4ab6341b14009f1907e.tar.gz |
NO-JIRA: create branch for qpid-cpp 0.34 RC process
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-cpp-0.34-rc@1687469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/sandbox/models/fedsim/fedsim.py')
-rw-r--r-- | qpid/sandbox/models/fedsim/fedsim.py | 434 |
1 files changed, 434 insertions, 0 deletions
diff --git a/qpid/sandbox/models/fedsim/fedsim.py b/qpid/sandbox/models/fedsim/fedsim.py new file mode 100644 index 0000000000..edb6c4c8ed --- /dev/null +++ b/qpid/sandbox/models/fedsim/fedsim.py @@ -0,0 +1,434 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Sim: + def __init__(self): + self.brokers = {} + self.clients = {} + self.errors = 0 + self.warnings = 0 + + def error(self, text): + self.errors += 1 + print "###### Error:", text + + def warning(self, text): + self.warnings += 1 + print "###### Warning:", text + + def end(self): + print "========================" + print "Errors: %d, Warnings: %d" % (self.errors, self.warnings) + print "========================" + + def dumpState(self): + print "============================" + print "===== Federation State =====" + print "============================" + for broker in self.brokers: + for exchange in self.brokers[broker].exchanges: + print "Exchange %s.%s" % (broker, exchange) + for key in self.brokers[broker].exchanges[exchange].keys: + print " Key %s" % key + for queue in self.brokers[broker].exchanges[exchange].keys[key]: + print " Queue %s origins=%s" % \ + (queue.name, self.brokers[broker].exchanges[exchange].keys[key][queue].originList) + + def addBroker(self, name): + if name in self.brokers: + raise Exception("Broker of same name already exists") + broker = Broker(self, name) + self.brokers[name] = broker + return broker + + def addClient(self, name, broker): + if name in self.clients: + raise Exception("Client of same name already exists") + client = Client(self, name, broker) + self.clients[name] = client + return client + + def link(self, left, right, bidir=True): + print "====== link %s to %s, bidir=%s" % (left.tag, right.tag, bidir) + l1 = left.createLink(right) + l1.bridge("amq.direct") + if bidir: + l2 = right.createLink(left) + l2.bridge("amq.direct") + + def bind(self, client, key): + print "====== bind Client(%s): k=%s" % (client.name, key) + client.bind(key) + + def unbind(self, client, key): + print "====== unbind Client(%s): k=%s" % (client.name, key) + client.unbind(key) + + def sendMessage(self, key, broker, body="Message Body"): + print "====== sendMessage: broker=%s k=%s" % (broker.tag, key) + msg = Message(key, body) + exchange = broker.exchanges["amq.direct"] + for client in self.clients: + self.clients[client].expect(key); + exchange.receive(key, msg, True) + for client in self.clients: + self.clients[client].checkReception() + + +class Destination: + def receive(self, key, msg, fromUser=False): + pass + + +class Client(Destination): + def __init__(self, sim, name, broker): + self.sim = sim + self.name = name + self.broker = broker + self.broker.connect(self) + self.queue = self.broker.declare_queue(name) + self.subscription = self.broker.subscribe(self, name) + self.expected = None + self.boundKeys = [] + + def bind(self, key): + self.boundKeys.append(key) + self.broker.bind("amq.direct", self.name, key) + + def unbind(self, key): + self.boundKeys.remove(key) + self.broker.unbind("amq.direct", self.name, key) + + def receive(self, key, msg, fromUser=False): + print "Client(%s) received [%s]: %s" % (self.name, key, msg.body) + if self.expected == key: + self.expected = None + else: + self.sim.error("Client(%s) received unexpected message with key [%s]" % \ + (self.name, self.expected)) + + def expect(self, key): + if key in self.boundKeys: + self.expected = key + + def checkReception(self): + if self.expected: + self.sim.error("Client(%s) never received message with key [%s]" % \ + (self.name, self.expected)) + +class Broker(Client): + def __init__(self, sim, tag): + self.sim = sim + self.tag = tag + self.connections = {} + self.exchanges = {} + self.queues = {} + self.subscriptions = {} + self.links = {} + self.directExchange = self.declare_exchange("amq.direct") + + def connect(self, client): + if client in self.connections: + raise Exception("Client already connected") + self.connections[client] = Connection(client) + + def declare_queue(self, name, tag=None, exclude=None): + if name in self.queues: + raise Exception("Queue already exists") + self.queues[name] = Queue(self, name, tag, exclude) + + def subscribe(self, dest, queueName): + if queueName not in self.queues: + raise Exception("Queue does not exist") + self.queues[queueName].setDest(dest) + + def declare_exchange(self, name): + if name in self.exchanges: + return + exchange = Exchange(self, name) + self.exchanges[name] = exchange + return exchange + + def bind(self, exchangeName, queueName, key, tagList=[], fedOp=None, origin=None): + if exchangeName not in self.exchanges: + raise Exception("Exchange not found") + if queueName not in self.queues: + raise Exception("Queue not found") + exchange = self.exchanges[exchangeName] + queue = self.queues[queueName] + exchange.bind(queue, key, tagList, fedOp, origin) + + def unbind(self, exchangeName, queueName, key): + if exchangeName not in self.exchanges: + raise Exception("Exchange not found") + if queueName not in self.queues: + raise Exception("Queue not found") + exchange = self.exchanges[exchangeName] + queue = self.queues[queueName] + exchange.unbind(queue, key) + + def createLink(self, other): + if other in self.links: + raise Exception("Peer broker already linked") + link = Link(self, other) + self.links[other] = link + return link + + +class Connection: + def __init__(self, client): + self.client = client + + +class Exchange(Destination): + def __init__(self, broker, name): + self.broker = broker + self.sim = broker.sim + self.name = name + self.keys = {} + self.bridges = [] + + def bind(self, queue, key, tagList, fedOp, origin): + if not fedOp: fedOp = "bind" + print "Exchange(%s.%s) bind q=%s, k=%s, tags=%s, op=%s, origin=%s" % \ + (self.broker.tag, self.name, queue.name, key, tagList, fedOp, origin), + + if self.broker.tag in tagList: + print "(tag ignored)" + return + + if fedOp == "bind" or fedOp == "unbind": + if key not in self.keys: + self.keys[key] = {} + queueMap = self.keys[key] + + if fedOp == "bind": + ## + ## Add local or federation binding case + ## + if queue in queueMap: + if origin and origin in queueMap[queue].originList: + print "(dup ignored)" + elif origin: + queueMap[queue].originList.append(origin) + print "(origin added)" + else: + binding = Binding(origin) + queueMap[queue] = binding + print "(binding added)" + + elif fedOp == "unbind": + ## + ## Delete federation binding case + ## + if queue in queueMap: + binding = queueMap[queue] + if origin and origin in binding.originList: + binding.originList.remove(origin) + if len(binding.originList) == 0: + queueMap.pop(queue) + if len(queueMap) == 0: + self.keys.pop(key) + print "(last origin del)" + else: + print "(removed origin)" + else: + print "(origin not found)" + else: + print "(queue not found)" + + elif fedOp == "reorigin": + print "(ok)" + self.reorigin() + + elif fedOp == "hello": + print "(ok)" + + else: + raise Exception("Unknown fed-opcode '%s'" % fedOp) + + newTagList = [] + newTagList.append(self.broker.tag) + for tag in tagList: + newTagList.append(tag) + if origin: + propOrigin = origin + else: + propOrigin = self.broker.tag + + for bridge in self.bridges: + if bridge.isDynamic(): + bridge.propagate(key, newTagList, fedOp, propOrigin) + + def reorigin(self): + myTag = [] + myTag.append(self.broker.tag) + for key in self.keys: + queueMap = self.keys[key] + found = False + for queue in queueMap: + binding = queueMap[queue] + if binding.isLocal(): + found = True + if found: + for bridge in self.bridges: + if bridge.isDynamic(): + bridge.propagate(key, myTag, "bind", self.broker.tag) + + def unbind(self, queue, key): + print "Exchange(%s.%s) unbind q=%s, k=%s" % (self.broker.tag, self.name, queue.name, key), + if key not in self.keys: + print "(key not known)" + return + queueMap = self.keys[key] + if queue not in queueMap: + print "(queue not bound)" + return + queueMap.pop(queue) + if len(queueMap) == 0: + self.keys.pop(key) + print "(ok, remove bound-key)" + else: + print "(ok)" + + count = 0 + for queue in queueMap: + if len(queueMap[queue].originList) == 0: + count += 1 + + if count == 0: + myTag = [] + myTag.append(self.broker.tag) + for bridge in self.bridges: + if bridge.isDynamic(): + bridge.propagate(key, myTag, "unbind", self.broker.tag) + + def receive(self, key, msg, fromUser=False): + sent = False + if key in self.keys: + queueMap = self.keys[key] + for queue in queueMap: + if queue.enqueue(msg): + sent = True + if not sent and not fromUser: + self.sim.warning("Exchange(%s.%s) received unroutable message: k=%s" % \ + (self.broker.tag, self.name, key)) + + def addDynamicBridge(self, bridge): + if bridge in self.bridges: + raise Exception("Dynamic bridge already added to exchange") + self.bridges.append(bridge) + + for b in self.bridges: + if b != bridge: + b.sendReorigin() + self.reorigin() + +class Queue: + def __init__(self, broker, name, tag=None, exclude=None): + self.broker = broker + self.name = name + self.tag = tag + self.exclude = exclude + self.dest = None + + def setDest(self, dest): + self.dest = dest + + def enqueue(self, msg): + print "Queue(%s.%s) rcvd k=%s, tags=%s" % (self.broker.tag, self.name, msg.key, msg.tags), + if self.dest == None: + print "(dropped, no dest)" + return False + if self.exclude and msg.tagFound(self.exclude): + print "(dropped, tag)" + return False + if self.tag: + msg.appendTag(self.tag) + print "(ok)" + self.dest.receive(msg.key, msg) + return True + + +class Binding: + def __init__(self, origin): + self.originList = [] + if origin: + self.originList.append(origin) + + def isLocal(self): + return len(self.originList) == 0 + + +class Link: + def __init__(self, local, remote): + self.local = local + self.remote = remote + self.remote.connect(self) + self.bridges = [] + + def bridge(self, exchangeName): + bridge = Bridge(self, exchangeName) + + +class Bridge: + def __init__(self, link, exchangeName): + self.link = link + self.exchangeName = exchangeName + if self.exchangeName not in link.local.exchanges: + raise Exception("Exchange not found") + self.exchange = link.local.exchanges[self.exchangeName] + self.queueName = "bridge." + link.local.tag + self.link.remote.declare_queue(self.queueName, self.link.remote.tag, self.link.local.tag) + self.link.remote.subscribe(self.exchange, self.queueName) + self.exchange.addDynamicBridge(self) + + def isDynamic(self): + return True + + def localTag(self): + return self.link.local.tag + + def remoteTag(self): + return self.link.remote.tag + + def propagate(self, key, tagList, fedOp, origin): + if self.link.remote.tag not in tagList: + self.link.remote.bind(self.exchangeName, self.queueName, key, tagList, fedOp, origin) + + def sendReorigin(self): + myTag = [] + myTag.append(self.link.local.tag) + self.link.remote.bind(self.exchangeName, self.queueName, "", myTag, "reorigin", "") + + +class Message: + def __init__(self, key, body): + self.key = key + self.body = body + self.tags = [] + + def appendTag(self, tag): + if tag not in self.tags: + self.tags.append(tag) + + def tagFound(self, tag): + return tag in self.tags + + |