summaryrefslogtreecommitdiff
path: root/qpid/sandbox/models/fedsim/fedsim.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/sandbox/models/fedsim/fedsim.py')
-rw-r--r--qpid/sandbox/models/fedsim/fedsim.py434
1 files changed, 434 insertions, 0 deletions
diff --git a/qpid/sandbox/models/fedsim/fedsim.py b/qpid/sandbox/models/fedsim/fedsim.py
new file mode 100644
index 0000000000..edb6c4c8ed
--- /dev/null
+++ b/qpid/sandbox/models/fedsim/fedsim.py
@@ -0,0 +1,434 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Sim:
+ def __init__(self):
+ self.brokers = {}
+ self.clients = {}
+ self.errors = 0
+ self.warnings = 0
+
+ def error(self, text):
+ self.errors += 1
+ print "###### Error:", text
+
+ def warning(self, text):
+ self.warnings += 1
+ print "###### Warning:", text
+
+ def end(self):
+ print "========================"
+ print "Errors: %d, Warnings: %d" % (self.errors, self.warnings)
+ print "========================"
+
+ def dumpState(self):
+ print "============================"
+ print "===== Federation State ====="
+ print "============================"
+ for broker in self.brokers:
+ for exchange in self.brokers[broker].exchanges:
+ print "Exchange %s.%s" % (broker, exchange)
+ for key in self.brokers[broker].exchanges[exchange].keys:
+ print " Key %s" % key
+ for queue in self.brokers[broker].exchanges[exchange].keys[key]:
+ print " Queue %s origins=%s" % \
+ (queue.name, self.brokers[broker].exchanges[exchange].keys[key][queue].originList)
+
+ def addBroker(self, name):
+ if name in self.brokers:
+ raise Exception("Broker of same name already exists")
+ broker = Broker(self, name)
+ self.brokers[name] = broker
+ return broker
+
+ def addClient(self, name, broker):
+ if name in self.clients:
+ raise Exception("Client of same name already exists")
+ client = Client(self, name, broker)
+ self.clients[name] = client
+ return client
+
+ def link(self, left, right, bidir=True):
+ print "====== link %s to %s, bidir=%s" % (left.tag, right.tag, bidir)
+ l1 = left.createLink(right)
+ l1.bridge("amq.direct")
+ if bidir:
+ l2 = right.createLink(left)
+ l2.bridge("amq.direct")
+
+ def bind(self, client, key):
+ print "====== bind Client(%s): k=%s" % (client.name, key)
+ client.bind(key)
+
+ def unbind(self, client, key):
+ print "====== unbind Client(%s): k=%s" % (client.name, key)
+ client.unbind(key)
+
+ def sendMessage(self, key, broker, body="Message Body"):
+ print "====== sendMessage: broker=%s k=%s" % (broker.tag, key)
+ msg = Message(key, body)
+ exchange = broker.exchanges["amq.direct"]
+ for client in self.clients:
+ self.clients[client].expect(key);
+ exchange.receive(key, msg, True)
+ for client in self.clients:
+ self.clients[client].checkReception()
+
+
+class Destination:
+ def receive(self, key, msg, fromUser=False):
+ pass
+
+
+class Client(Destination):
+ def __init__(self, sim, name, broker):
+ self.sim = sim
+ self.name = name
+ self.broker = broker
+ self.broker.connect(self)
+ self.queue = self.broker.declare_queue(name)
+ self.subscription = self.broker.subscribe(self, name)
+ self.expected = None
+ self.boundKeys = []
+
+ def bind(self, key):
+ self.boundKeys.append(key)
+ self.broker.bind("amq.direct", self.name, key)
+
+ def unbind(self, key):
+ self.boundKeys.remove(key)
+ self.broker.unbind("amq.direct", self.name, key)
+
+ def receive(self, key, msg, fromUser=False):
+ print "Client(%s) received [%s]: %s" % (self.name, key, msg.body)
+ if self.expected == key:
+ self.expected = None
+ else:
+ self.sim.error("Client(%s) received unexpected message with key [%s]" % \
+ (self.name, self.expected))
+
+ def expect(self, key):
+ if key in self.boundKeys:
+ self.expected = key
+
+ def checkReception(self):
+ if self.expected:
+ self.sim.error("Client(%s) never received message with key [%s]" % \
+ (self.name, self.expected))
+
+class Broker(Client):
+ def __init__(self, sim, tag):
+ self.sim = sim
+ self.tag = tag
+ self.connections = {}
+ self.exchanges = {}
+ self.queues = {}
+ self.subscriptions = {}
+ self.links = {}
+ self.directExchange = self.declare_exchange("amq.direct")
+
+ def connect(self, client):
+ if client in self.connections:
+ raise Exception("Client already connected")
+ self.connections[client] = Connection(client)
+
+ def declare_queue(self, name, tag=None, exclude=None):
+ if name in self.queues:
+ raise Exception("Queue already exists")
+ self.queues[name] = Queue(self, name, tag, exclude)
+
+ def subscribe(self, dest, queueName):
+ if queueName not in self.queues:
+ raise Exception("Queue does not exist")
+ self.queues[queueName].setDest(dest)
+
+ def declare_exchange(self, name):
+ if name in self.exchanges:
+ return
+ exchange = Exchange(self, name)
+ self.exchanges[name] = exchange
+ return exchange
+
+ def bind(self, exchangeName, queueName, key, tagList=[], fedOp=None, origin=None):
+ if exchangeName not in self.exchanges:
+ raise Exception("Exchange not found")
+ if queueName not in self.queues:
+ raise Exception("Queue not found")
+ exchange = self.exchanges[exchangeName]
+ queue = self.queues[queueName]
+ exchange.bind(queue, key, tagList, fedOp, origin)
+
+ def unbind(self, exchangeName, queueName, key):
+ if exchangeName not in self.exchanges:
+ raise Exception("Exchange not found")
+ if queueName not in self.queues:
+ raise Exception("Queue not found")
+ exchange = self.exchanges[exchangeName]
+ queue = self.queues[queueName]
+ exchange.unbind(queue, key)
+
+ def createLink(self, other):
+ if other in self.links:
+ raise Exception("Peer broker already linked")
+ link = Link(self, other)
+ self.links[other] = link
+ return link
+
+
+class Connection:
+ def __init__(self, client):
+ self.client = client
+
+
+class Exchange(Destination):
+ def __init__(self, broker, name):
+ self.broker = broker
+ self.sim = broker.sim
+ self.name = name
+ self.keys = {}
+ self.bridges = []
+
+ def bind(self, queue, key, tagList, fedOp, origin):
+ if not fedOp: fedOp = "bind"
+ print "Exchange(%s.%s) bind q=%s, k=%s, tags=%s, op=%s, origin=%s" % \
+ (self.broker.tag, self.name, queue.name, key, tagList, fedOp, origin),
+
+ if self.broker.tag in tagList:
+ print "(tag ignored)"
+ return
+
+ if fedOp == "bind" or fedOp == "unbind":
+ if key not in self.keys:
+ self.keys[key] = {}
+ queueMap = self.keys[key]
+
+ if fedOp == "bind":
+ ##
+ ## Add local or federation binding case
+ ##
+ if queue in queueMap:
+ if origin and origin in queueMap[queue].originList:
+ print "(dup ignored)"
+ elif origin:
+ queueMap[queue].originList.append(origin)
+ print "(origin added)"
+ else:
+ binding = Binding(origin)
+ queueMap[queue] = binding
+ print "(binding added)"
+
+ elif fedOp == "unbind":
+ ##
+ ## Delete federation binding case
+ ##
+ if queue in queueMap:
+ binding = queueMap[queue]
+ if origin and origin in binding.originList:
+ binding.originList.remove(origin)
+ if len(binding.originList) == 0:
+ queueMap.pop(queue)
+ if len(queueMap) == 0:
+ self.keys.pop(key)
+ print "(last origin del)"
+ else:
+ print "(removed origin)"
+ else:
+ print "(origin not found)"
+ else:
+ print "(queue not found)"
+
+ elif fedOp == "reorigin":
+ print "(ok)"
+ self.reorigin()
+
+ elif fedOp == "hello":
+ print "(ok)"
+
+ else:
+ raise Exception("Unknown fed-opcode '%s'" % fedOp)
+
+ newTagList = []
+ newTagList.append(self.broker.tag)
+ for tag in tagList:
+ newTagList.append(tag)
+ if origin:
+ propOrigin = origin
+ else:
+ propOrigin = self.broker.tag
+
+ for bridge in self.bridges:
+ if bridge.isDynamic():
+ bridge.propagate(key, newTagList, fedOp, propOrigin)
+
+ def reorigin(self):
+ myTag = []
+ myTag.append(self.broker.tag)
+ for key in self.keys:
+ queueMap = self.keys[key]
+ found = False
+ for queue in queueMap:
+ binding = queueMap[queue]
+ if binding.isLocal():
+ found = True
+ if found:
+ for bridge in self.bridges:
+ if bridge.isDynamic():
+ bridge.propagate(key, myTag, "bind", self.broker.tag)
+
+ def unbind(self, queue, key):
+ print "Exchange(%s.%s) unbind q=%s, k=%s" % (self.broker.tag, self.name, queue.name, key),
+ if key not in self.keys:
+ print "(key not known)"
+ return
+ queueMap = self.keys[key]
+ if queue not in queueMap:
+ print "(queue not bound)"
+ return
+ queueMap.pop(queue)
+ if len(queueMap) == 0:
+ self.keys.pop(key)
+ print "(ok, remove bound-key)"
+ else:
+ print "(ok)"
+
+ count = 0
+ for queue in queueMap:
+ if len(queueMap[queue].originList) == 0:
+ count += 1
+
+ if count == 0:
+ myTag = []
+ myTag.append(self.broker.tag)
+ for bridge in self.bridges:
+ if bridge.isDynamic():
+ bridge.propagate(key, myTag, "unbind", self.broker.tag)
+
+ def receive(self, key, msg, fromUser=False):
+ sent = False
+ if key in self.keys:
+ queueMap = self.keys[key]
+ for queue in queueMap:
+ if queue.enqueue(msg):
+ sent = True
+ if not sent and not fromUser:
+ self.sim.warning("Exchange(%s.%s) received unroutable message: k=%s" % \
+ (self.broker.tag, self.name, key))
+
+ def addDynamicBridge(self, bridge):
+ if bridge in self.bridges:
+ raise Exception("Dynamic bridge already added to exchange")
+ self.bridges.append(bridge)
+
+ for b in self.bridges:
+ if b != bridge:
+ b.sendReorigin()
+ self.reorigin()
+
+class Queue:
+ def __init__(self, broker, name, tag=None, exclude=None):
+ self.broker = broker
+ self.name = name
+ self.tag = tag
+ self.exclude = exclude
+ self.dest = None
+
+ def setDest(self, dest):
+ self.dest = dest
+
+ def enqueue(self, msg):
+ print "Queue(%s.%s) rcvd k=%s, tags=%s" % (self.broker.tag, self.name, msg.key, msg.tags),
+ if self.dest == None:
+ print "(dropped, no dest)"
+ return False
+ if self.exclude and msg.tagFound(self.exclude):
+ print "(dropped, tag)"
+ return False
+ if self.tag:
+ msg.appendTag(self.tag)
+ print "(ok)"
+ self.dest.receive(msg.key, msg)
+ return True
+
+
+class Binding:
+ def __init__(self, origin):
+ self.originList = []
+ if origin:
+ self.originList.append(origin)
+
+ def isLocal(self):
+ return len(self.originList) == 0
+
+
+class Link:
+ def __init__(self, local, remote):
+ self.local = local
+ self.remote = remote
+ self.remote.connect(self)
+ self.bridges = []
+
+ def bridge(self, exchangeName):
+ bridge = Bridge(self, exchangeName)
+
+
+class Bridge:
+ def __init__(self, link, exchangeName):
+ self.link = link
+ self.exchangeName = exchangeName
+ if self.exchangeName not in link.local.exchanges:
+ raise Exception("Exchange not found")
+ self.exchange = link.local.exchanges[self.exchangeName]
+ self.queueName = "bridge." + link.local.tag
+ self.link.remote.declare_queue(self.queueName, self.link.remote.tag, self.link.local.tag)
+ self.link.remote.subscribe(self.exchange, self.queueName)
+ self.exchange.addDynamicBridge(self)
+
+ def isDynamic(self):
+ return True
+
+ def localTag(self):
+ return self.link.local.tag
+
+ def remoteTag(self):
+ return self.link.remote.tag
+
+ def propagate(self, key, tagList, fedOp, origin):
+ if self.link.remote.tag not in tagList:
+ self.link.remote.bind(self.exchangeName, self.queueName, key, tagList, fedOp, origin)
+
+ def sendReorigin(self):
+ myTag = []
+ myTag.append(self.link.local.tag)
+ self.link.remote.bind(self.exchangeName, self.queueName, "", myTag, "reorigin", "")
+
+
+class Message:
+ def __init__(self, key, body):
+ self.key = key
+ self.body = body
+ self.tags = []
+
+ def appendTag(self, tag):
+ if tag not in self.tags:
+ self.tags.append(tag)
+
+ def tagFound(self, tag):
+ return tag in self.tags
+
+