1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
#!/usr/bin/env python
"""
server.py
Server for a client/server example
"""
import qpid
import sys
import os
from random import randint
from qpid.util import connect
from qpid.connection import Connection
from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
#----- Functions -------------------------------------------
def getProperty(msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
return None
def respond(session, request):
# The routing key for the response is the request's reply-to
# property. The body for the response is the request's body,
# converted to upper case.
reply_to = getProperty(request,"reply_to")
if reply_to == None:
raise Exception("reply to property needs to be there")
props = session.delivery_properties(routing_key=reply_to["routing_key"])
session.message_transfer(reply_to["exchange"],None, None, Message(props,request.body.upper()))
#----- Initialization --------------------------------------
# Set parameters for login
host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1"
port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672
user="guest"
password="guest"
amqp_spec=""
try:
amqp_spec = os.environ["AMQP_SPEC"]
except KeyError:
amqp_spec="/usr/share/amqp/amqp.0-10.xml"
# Create a connection.
conn = Connection (connect (host,port), qpid.spec.load(amqp_spec))
conn.start()
session_id = str(uuid4())
session = conn.session(session_id)
#----- Main Body -- ----------------------------------------
# Create a request queue and subscribe to it
session.queue_declare(queue="request", exclusive=True)
session.exchange_bind(exchange="amq.direct", queue="request", binding_key="request")
dest = "request_destination"
session.message_subscribe(queue="request", destination=dest)
session.message_flow(dest, 0, 0xFFFFFFFF)
session.message_flow(dest, 1, 0xFFFFFFFF)
# Remind the user to start the client program
print "Request server running - run your client now."
print "(Times out after 100 seconds ...)"
sys.stdout.flush()
# Respond to each request
queue = session.incoming(dest)
# If we get a message, send it back to the user (as indicated in the
# ReplyTo property)
while True:
try:
request = queue.get(timeout=100)
respond(session, request)
session.message_accept(RangedSet(request.id))
except Empty:
print "No more messages!"
break;
#----- Cleanup ------------------------------------------------
# Clean up before exiting so there are no open threads.
session.close(timeout=10)
|