summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionManager.cpp
blob: bf5191e8a0ccf9477e91f09e51489e2a75aa9a72 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#ifndef _Subscription_
#define _Subscription_

#include "SubscriptionManager.h"
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session_0_10.h>
#include <qpid/client/MessageListener.h>
#include <set>
#include <sstream>


namespace qpid {
namespace client {

SubscriptionManager::SubscriptionManager(Session_0_10& s)
    : dispatcher(s), session(s), messages(1), bytes(UNLIMITED), autoStop(true)
{}

std::string SubscriptionManager::uniqueTag(const std::string& tag) {
    // Make unique tag.
    int count=1;
    std::string unique=tag;
    while (subscriptions.find(tag) != subscriptions.end()) {
        std::ostringstream s;
        s << tag << "-" << count++;
        unique=s.str();
    }
    subscriptions.insert(unique);
    return tag;
}

std::string SubscriptionManager::subscribe(
    MessageListener& listener, const std::string& q, const std::string& t)
{
    std::string tag=uniqueTag(t);
    using namespace arg;
    session.messageSubscribe(arg::queue=q, arg::destination=tag);
    flowLimits(tag, messages, bytes);
    dispatcher.listen(tag, &listener);
    return tag;
}

void SubscriptionManager::flowLimits(
    const std::string& tag, uint32_t messages,  uint32_t bytes) {
    session.messageFlow(tag, 0, messages); 
    session.messageFlow(tag, 1, bytes);
}

void SubscriptionManager::flowLimits(uint32_t m,  uint32_t b) {
    messages=m;
    bytes=b;
}

void SubscriptionManager::cancel(const std::string tag)
{
    if (subscriptions.erase(tag)) {
        dispatcher.cancel(tag);
        session.messageCancel(tag);
        if (autoStop && subscriptions.empty()) stop();
    }
}

void SubscriptionManager::run(bool autoStop_)
{
    autoStop=autoStop_;
    if (autoStop && subscriptions.empty()) return;
    dispatcher.run();
}

void SubscriptionManager::stop()
{
    dispatcher.stop();
}

}} // namespace qpid::client

#endif