summaryrefslogtreecommitdiff
path: root/tools/publish.c
blob: 6bc20daab6faed68d811201dd2e903997e5ed7ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "common.h"

#define MAX_LINE_LENGTH 1024 * 32

static void do_publish(amqp_connection_state_t conn, char *exchange,
                       char *routing_key, amqp_basic_properties_t *props,
                       amqp_bytes_t body) {
  int res = amqp_basic_publish(conn, 1, cstring_bytes(exchange),
                               cstring_bytes(routing_key), 0, 0, props, body);
  die_amqp_error(res, "basic.publish");
}

int main(int argc, const char **argv) {
  amqp_connection_state_t conn;
  static char *exchange = NULL;
  static char *routing_key = NULL;
  static char *content_type = NULL;
  static char *content_encoding = NULL;
  static char **headers = NULL;
  static char *reply_to = NULL;
  static char *body = NULL;
  amqp_basic_properties_t props;
  amqp_bytes_t body_bytes;
  static int delivery = 1; /* non-persistent by default */
  static int line_buffered = 0;
  static char **pos;

  struct poptOption options[] = {
      INCLUDE_OPTIONS(connect_options),
      {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
       "the exchange to publish to", "exchange"},
      {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
       "the routing key to publish with", "routing key"},
      {"persistent", 'p', POPT_ARG_VAL, &delivery, 2,
       "use the persistent delivery mode", NULL},
      {"content-type", 'C', POPT_ARG_STRING, &content_type, 0,
       "the content-type for the message", "content type"},
      {"reply-to", 't', POPT_ARG_STRING, &reply_to, 0,
       "the replyTo to use for the message", "reply to"},
      {"line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2,
       "treat each line from standard in as a separate message", NULL},
      {"content-encoding", 'E', POPT_ARG_STRING, &content_encoding, 0,
       "the content-encoding for the message", "content encoding"},
      {"header", 'H', POPT_ARG_ARGV, &headers, 0,
       "set a message header (may be specified multiple times)",
       "\"key: value\""},
      {"body", 'b', POPT_ARG_STRING, &body, 0, "specify the message body",
       "body"},
      POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};

  process_all_options(argc, argv, options);

  if (!exchange && !routing_key) {
    fprintf(stderr, "neither exchange nor routing key specified\n");
    return 1;
  }

  memset(&props, 0, sizeof props);
  props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
  props.delivery_mode = delivery;

  if (content_type) {
    props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
    props.content_type = amqp_cstring_bytes(content_type);
  }

  if (content_encoding) {
    props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
    props.content_encoding = amqp_cstring_bytes(content_encoding);
  }

  if (reply_to) {
    props._flags |= AMQP_BASIC_REPLY_TO_FLAG;
    props.reply_to = amqp_cstring_bytes(reply_to);
  }

  if (headers) {
    int num = 0;
    for (pos = headers; *pos; pos++) {
      num++;
    }

    if (num > 0) {
      amqp_table_t *table = &props.headers;
      table->num_entries = num;
      table->entries = calloc(num, sizeof(amqp_table_entry_t));
      int i = 0;
      for (pos = headers; *pos; pos++) {
        char *colon = strchr(*pos, ':');
        if (colon) {
          *colon++ = '\0';
          while (*colon == ' ') colon++;
          table->entries[i].key = amqp_cstring_bytes(*pos);
          table->entries[i].value.kind = AMQP_FIELD_KIND_UTF8;
          table->entries[i].value.value.bytes = amqp_cstring_bytes(colon);
          i++;
        } else {
          fprintf(stderr,
                  "Ignored header definition missing ':' delimiter in \"%s\"\n",
                  *pos);
        }
      }
      props._flags |= AMQP_BASIC_HEADERS_FLAG;
    }
  }

  conn = make_connection();

  if (body) {
    body_bytes = amqp_cstring_bytes(body);
  } else {
    if (line_buffered) {
      body_bytes.bytes = (char *)malloc(MAX_LINE_LENGTH);
      while (fgets(body_bytes.bytes, MAX_LINE_LENGTH, stdin)) {
        body_bytes.len = strlen(body_bytes.bytes);
        do_publish(conn, exchange, routing_key, &props, body_bytes);
      }
    } else {
      body_bytes = read_all(0);
    }
  }

  if (!line_buffered) {
    do_publish(conn, exchange, routing_key, &props, body_bytes);
  }

  if (props.headers.num_entries > 0) {
    free(props.headers.entries);
  }

  if (!body) {
    free(body_bytes.bytes);
  }

  close_connection(conn);
  return 0;
}