1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
def publisher(n):
import qpid
import sys
from qpid.client import Client
from qpid.content import Content
if len(sys.argv) >= 3:
n = int(sys.argv[2])
client = Client("127.0.0.1", 5672)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
channel = client.channel(1)
channel.session_open()
message = Content("message")
message["routing_key"] = "message_queue"
print "producing ", n, " messages"
for i in range(n):
channel.message_transfer(destination="amq.direct", content=message)
print "producing final message"
message = Content("That's done")
message["routing_key"] = "message_queue"
channel.message_transfer(destination="amq.direct", content=message)
print "consuming sync message"
consumer = "consumer"
queue = client.queue(consumer)
channel.message_subscribe(queue="sync_queue", destination=consumer)
channel.message_flow(consumer, 0, 0xFFFFFFFF)
channel.message_flow(consumer, 1, 0xFFFFFFFF)
queue.get(block = True)
print "done"
channel.session_close()
def consumer():
import sys
import qpid
from qpid.client import Client
from qpid.content import Content
client = Client("127.0.0.1", 5672)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
channel = client.channel(1)
channel.session_open()
consumer = "consumer"
queue = client.queue(consumer)
channel.message_subscribe(queue="message_queue", destination=consumer)
channel.message_flow(consumer, 0, 0xFFFFFFFF)
channel.message_flow(consumer, 1, 0xFFFFFFFF)
final = "That's done"
content = ""
message = None
print "getting messages"
while content != final:
message = queue.get(block = True)
content = message.content.body
message.complete(cumulative=True)
print "consumed all messages"
message = Content("message")
message["routing_key"] = "sync_queue"
channel.message_transfer(destination="amq.direct", content=message)
print "done"
channel.session_close()
if __name__=='__main__':
import sys
import qpid
from timeit import Timer
from qpid.client import Client
from qpid.content import Content
client = Client("127.0.0.1", 5672)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
channel = client.channel(1)
channel.session_open()
channel.queue_declare(queue="message_queue")
channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue")
channel.queue_declare(queue="sync_queue")
channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue")
channel.session_close()
numMess = 100
if len(sys.argv) >= 3:
numMess = int(sys.argv[2])
if len(sys.argv) == 1:
print "error: please specify prod or cons"
elif sys.argv[1] == 'prod':
tprod = Timer("publisher(100)", "from __main__ import publisher")
tp = tprod.timeit(1)
print "produced and consumed" , numMess + 2 ,"messages in: ", tp
elif sys.argv[1] == 'cons':
tcons = Timer("consumer()", "from __main__ import consumer")
tc = tcons.timeit(1)
print "consumed " , numMess ," in: ", tc
else:
print "please specify prod or cons"
|