summaryrefslogtreecommitdiff
path: root/qpid/cpp/examples/examples/direct/listener.cpp
blob: 3f92d189deb8145575f4f98c8118d673c8cef37f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

/**
 *  listener.cpp:
 *
 *  This program is one of three programs designed to be used
 *  together. These programs do not specify the exchange type - the
 *  default exchange type is the direct exchange.
 *  
 *    declare_queues.cpp:
 *
 *      Creates a queue on a broker, binding a routing key to route
 *      messages to that queue.
 *
 *    direct_producer.cpp:
 *
 *      Publishes to a broker, specifying a routing key.
 *
 *    listener.cpp (this program):
 *
 *      Reads from a queue on the broker using a message listener.
 *
 */

#include <qpid/client/Dispatcher.h>
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>

#include <unistd.h>
#include <cstdlib>
#include <iostream>

using namespace qpid::client;
using namespace qpid::framing;

      
class Listener : public MessageListener{
private:
  std::string destination_name;
  Dispatcher dispatcher;
public:
  Listener(Session& session, string destination_name): 
    destination_name(destination_name),
    dispatcher(session)
  {};

  virtual void listen();
  virtual void received(Message& message);
  ~Listener() { };
};


void Listener::listen() {
  std::cout << "Activating listener for: " <<destination_name << std::endl;
  dispatcher.listen(destination_name, this);

  // The following line gives up control

  dispatcher.run();
}


void Listener::received(Message& message) {
  std::cout << "Message: " << message.getData() << std::endl;

  if (message.getData() == "That's all, folks!") {
      std::cout << "Shutting down listener for " <<destination_name << std::endl;
      dispatcher.stop();
  }
}



int main(int argc, char** argv) {
    const char* host = argc>1 ? argv[1] : "127.0.0.1";
    int port = argc>2 ? atoi(argv[2]) : 5672;
    Connection connection;
    Message msg;
    try {
      connection.open(host, port);
      Session session =  connection.newSession();

  //--------- Main body of program --------------------------------------------


      //  Subscribe to the queue, route it to a client destination for
      //  the  listener. (The destination  name merely  identifies the
      //  destination in the listener, you can use any name as long as
      //  you use the same name for the listener).

      session.messageSubscribe(arg::queue="message_queue", arg::destination="listener_destination");

      //##############
      session.messageFlow(arg::destination="listener_destination", arg::unit=0, arg::value=1);//messages ### Define a constant?
      session.messageFlow(arg::destination="listener_destination", arg::unit=1, arg::value=0xFFFFFFFF);//bytes ###### Define a constant?

      //  Tell the listener to listen to the destination we just
      //  created above.

      Listener listener(session, "listener_destination");
      listener.listen();

  //-----------------------------------------------------------------------------

      connection.close();
      return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
    }
    return 1;   
}