#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import optparse, time, qpid.messaging
from threading import Thread
from subprocess import Popen, PIPE, STDOUT

op = optparse.OptionParser(usage="usage: %prog [options]",
                           description="simple performance benchmarks")
op.add_option("-b", "--broker", default="127.0.0.1",
              help="url of broker to connect to")
op.add_option("-q", "--queues", default=1, type="int", metavar="N",
              help="create N queues (default %default)")
op.add_option("-s", "--senders", default=1, type="int", metavar="N",
                  help="start N senders per queue (default %default)")
op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
                  help="start N receivers per queue (default %default)")
op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
                      help="send N messages per sender (default %default)")
op.add_option("--queue-name", default="benchmark",
               help="base name for queues (default %default)")
op.add_option("--send-rate", default=0, metavar="R",
              help="send rate limited to R messages/second, 0 means no limit (default %default)")
op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
              help="message size in bytes (default %default)")
op.add_option("--ack-frequency", default=0, metavar="N", type="int",
              help="receiver ack's every N messages, 0 means unconfirmed (default %d)")
op.add_option("--no-report-header", dest="report_header", default=True,
              action="store_false", help="don't print header on report")
op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int")
op.add_option("--send-option", default=[], action="append", type="str",
              help="Additional option for sending addresses")
op.add_option("--receive-option", default=[], action="append", type="str",
              help="Additional option for receiving addresses")
op.add_option("--no-timestamp", dest="timestamp", default=True,
              action="store_false", help="don't add a timestamp, no latency results")

def start_receive(queue, opts, ready_queue):
    address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option))
    return Popen(["qpid-receive",
                  "-b", opts.broker,
                  "-a", address,
                  "--forever",
                  "--print-content=no",
                  "--report-total",
                  "--ack-frequency", str(opts.ack_frequency),
                  "--ready-address", ready_queue,
                  "--report-header=no",
                  ],
                 stdout=PIPE, stderr=STDOUT)

def start_send(queue, opts):
    address="%s;{%s}"%(queue,",".join(opts.send_option))
    return Popen(["qpid-send",
                  "-b", opts.broker,
                  "-a", address,
                  "--messages", str(opts.messages),
                  "--send-eos", str(opts.receivers),
                  "--content-size", str(opts.content_size),
                  "--send-rate", str(opts.send_rate),
                  "--report-total",
                  "--report-header=no",
                  "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
                  "--sequence=no",
                  ],
                 stdout=PIPE, stderr=STDOUT)

def wait_for_output(p):
    out,err=p.communicate()
    if p.returncode != 0: raise Exception("ERROR:\n%s"%(out))
    return out

def delete_queues(queues, broker):
    c = qpid.messaging.Connection(broker)
    c.open()
    for q in queues:
        try: s = c.session().sender("%s;{delete:always}"%(q))
        except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
    c.close()

def print_output(senders, receivers, want_header):
    send_stats = sum([wait_for_output(p).split("\n")[:-1] for p in senders],[])
    recv_stats = sum([wait_for_output(p).split("\n")[:-1] for p in receivers],[])
    def empty_if_none(s):
        if s: return s
        else: return ""
    stats = map(lambda s,r: empty_if_none(s)+"\t\t"+empty_if_none(r),
                send_stats, recv_stats)
    if want_header: print "send-tp\t\trecv-tp\tl-min\tl-max\tl-avg"
    for s in stats: print s;

class ReadyReceiver:
    """A receiver for ready messages"""
    def __init__(self, queue, broker):
        delete_queues([queue], broker)
        self.connection = qpid.messaging.Connection(broker)
        self.connection.open()
        self.receiver = self.connection.session().receiver(
            "%s;{create:always,delete:always}"%(queue))
        self.timeout=2

    def wait(self, receivers):
        try:
            for i in xrange(len(receivers)): self.receiver.fetch(self.timeout)
            self.connection.close()
        except qpid.messaging.Empty:
            for r in receivers:
                if (r.poll()): raise "Receiver error: %s"%(wait_for_output(r))
            raise "Timed out waiting for receivers to be ready"

def main():
    opts, args = op.parse_args()
    send_out = ""
    receive_out = ""
    ready_queue="%s-ready"%(opts.queue_name)
    queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
    for i in xrange(opts.repeat):
        delete_queues(queues, opts.broker)
        ready_receiver = ReadyReceiver(ready_queue, opts.broker)
        receivers = [start_receive(q, opts, ready_queue)
                     for q in queues for j in xrange(opts.receivers)]
        ready_receiver.wait(receivers) # Wait for receivers to be ready.
        senders = [start_send(q, opts) for q in queues for j in xrange(opts.senders)]
        print_output(senders, receivers, opts.report_header and i == 0)
        delete_queues(queues, opts.broker)

if __name__ == "__main__": main()

