#!/usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Test script for validating the behaviour of a ring queue QUEUE_NAME=ring-queue LIMIT=100 DURABLE=0 MESSAGES=10000 SENDERS=1 RECEIVERS=1 CONCURRENT=0 BROKER_URL="-b ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" setup() { if [[ $DURABLE -gt 0 ]]; then EXTRA_ARGS=" --durable" fi qpid-config $BROKER_URL add queue $QUEUE_NAME --max-queue-count $LIMIT --limit-policy ring $EXTRA_ARGS } send() { datagen --count $MESSAGES | tee sender_${QUEUE_NAME}_${1} | sender --durable $DURABLE --routing-key $QUEUE_NAME } receive() { #TODO: allow a variety of receiver options to be passed in (ack-frequency, credit-window etc) receiver --queue $QUEUE_NAME > receiver_${QUEUE_NAME}_${1} } cleanup() { rm -f sender_${QUEUE_NAME}_* receiver_${QUEUE_NAME}_* qpid-config $BROKER_URL del queue $QUEUE_NAME --force } log() { echo $1 } fail() { echo $1 FAILED=1 } validate() { if [[ $RECEIVERS -eq 0 ]]; then #queue should have $LIMIT messages on it, but need to send an eos also sender --routing-key $QUEUE_NAME --send-eos 1 < /dev/null received=$(receiver --queue $QUEUE_NAME --browse | wc -l) if [[ received -eq $(( $LIMIT - 1)) ]]; then log "queue contains $LIMIT messages as expected" else fail "queue does not contain the expected $LIMIT messages (received $received)" fi elif [[ $CONCURRENT -eq 0 ]]; then #sum of length of all output files should be equal to $LIMIT - $RECEIVERS (1 eos message each) received=$(cat receiver_${QUEUE_NAME}_* | wc -l) expected=$(( $LIMIT - $RECEIVERS )) if [[ $received -eq $expected ]]; then log "received $LIMIT messages as expected" else fail "received $received messages, expected $expected" fi #if there were only a single sender and receiver (executed serially) we can check the #actual received contents if [[ $RECEIVERS -eq 1 ]] && [[ $SENDERS -eq 1 ]]; then tail -n $(($LIMIT - 1)) sender_${QUEUE_NAME}_1 | diff - receiver_${QUEUE_NAME}_1 || FAILED=1 if [[ $FAILED -eq 1 ]]; then fail "did not receive expected messages" else log "received messages matched expectations" fi fi else #multiple receivers, concurrent with senders; ring queue functionality cannot be validated in this case if [[ $(cat receiver_${QUEUE_NAME}_* | wc -l) -le $(cat sender_${QUEUE_NAME}_* | wc -l) ]]; then log "sent at least as many messages as were received" else #Note: if any receiver was browsing, this would be valid (would need to add 'sort | uniq') # to pipeline above fail "received more messages than were sent" fi fi if [[ $FAILED ]]; then echo $(basename $0): FAILED exit 1 else cleanup fi } run_test() { if [[ $CONCURRENT -eq 0 ]]; then echo "Starting $SENDERS senders followed by $RECEIVERS receivers " else echo "Starting $SENDERS senders concurrently with $RECEIVERS receivers" fi for ((i=1; i <= $SENDERS; i++)); do send $i & sender_pids[$i]=$! done if [[ $CONCURRENT -eq 0 ]] && [[ $RECEIVERS -gt 0 ]]; then wait sender --routing-key $QUEUE_NAME --send-eos $RECEIVERS < /dev/null fi for ((i=1; i <= $RECEIVERS; i++)); do receive $i & done if [[ $CONCURRENT -gt 0 ]]; then for ((i=1; i <= $SENDERS; i++)); do wait ${sender_pids[$i]} done sender --routing-key $QUEUE_NAME --send-eos $RECEIVERS < /dev/null fi wait } usage() { cat < the name of the queue to use -s the number of senders to start -r the number of receivers to start -l the limit for the ring queue -m the number of messages to send -c if specified, receivers will run concurrently with senders -d if specified the queue and messages will be durable EOF exit 1 } while getopts "s:r:m:u:dch" opt ; do case $opt in q) QUEUE_NAME=$OPTARG ;; l) LIMIT=$OPTARG ;; s) SENDERS=$OPTARG ;; r) RECEIVERS=$OPTARG ;; m) MESSAGES=$OPTARG ;; d) DURABLE=1 ;; c) CONCURRENT=1 ;; h) usage;; ?) usage;; esac done if [[ $SENDERS -gt 0 ]]; then setup run_test validate else echo "Nothing can be done if there are no senders" fi