#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import qpid from qpid.connection import Connection, listen from qpid.delegate import Delegate from qpid.peer import Peer from qpid import Struct class Server(Delegate): def __init__(self): Delegate.__init__(self) self.queues = {} self.bindings = {} def connection_open(self, ch, msg): msg.open_ok() def session_open(self, ch, msg): print "session open on channel %s" % ch.id msg.attached() def execution_flush(self, ch, msg): pass def queue_declare(self, ch, msg): self.queues[msg.queue] = [] print "queue declared: %s" % msg.queue msg.complete() def queue_bind(self, ch, msg): if self.bindings.has_key(msg.exchange): queues = self.bindings[msg.exchange] else: queues = set() self.bindings[msg.exchange] = queues queues.add((msg.routing_key, msg.queue)) msg.complete() def queue_query(self, ch, msg): st = Struct(msg.method.result) ch.execution_result(msg.command_id, st) msg.complete() def message_subscribe(self, ch, msg): print msg msg.complete() def message_transfer(self, ch, msg): print msg.content msg.complete() spec = qpid.spec.load("../specs/amqp.0-10-preview.xml") for io in listen("0.0.0.0", 5672): c = Connection(io, spec) p = Peer(c, Server()) c.tini() p.start() ch = p.channel(0) ch.connection_start() ch.connection_tune()