summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_client.cpp
blob: f6b3a80c97c2ee5f018988b6947ee8e9eff2e1bc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
 *
 * Copyright (c) 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include "unit_test.h"
#include "BrokerFixture.h"
#include "qpid/client/Session.h"

#include <fstream>
#include <vector>
#include <functional>

QPID_AUTO_TEST_SUITE(cluster_clientTestSuite)

using namespace qpid;
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::client::arg;
using framing::TransferContent;
using std::vector;
using std::string;
using std::ifstream;
using std::ws;

struct ClusterConnections : public vector<shared_ptr<Connection> > {
    ClusterConnections() {
        ifstream portfile("cluster.ports");
        BOOST_REQUIRE(portfile.good());
        portfile >> ws;
        while (portfile.good()) {
            uint16_t port;
            portfile >> port >> ws;
            push_back(make_shared_ptr(new Connection(port)));
            back()->open("localhost", port);
        }
        BOOST_REQUIRE(size() > 1);
    }

    ~ClusterConnections() {
        for (iterator i = begin(); i != end(); ++i ){
            (*i)->close();
        }
    }
};

QPID_AUTO_TEST_CASE(testWiringReplication) {
    // Declare on one broker, use on others.
    ClusterConnections cluster;
    BOOST_REQUIRE(cluster.size() > 1);

    Session broker0 = cluster[0]->newSession(ASYNC);
    broker0.exchangeDeclare(exchange="ex");
    broker0.queueDeclare(queue="q");
    broker0.queueBind(exchange="ex", queue="q", routingKey="key");
    broker0.close();
    
    for (size_t i = 1; i < cluster.size(); ++i) {
        Session s = cluster[i]->newSession(ASYNC);
        s.messageTransfer(content=TransferContent("data", "key", "ex"));
        s.messageSubscribe(queue="q", destination="q");
        s.messageFlow(destination="q", unit=0, value=1);//messages
        FrameSet::shared_ptr msg = s.get();
        BOOST_CHECK(msg->isA<MessageTransferBody>());
        BOOST_CHECK_EQUAL(string("data"), msg->getContent());
        s.getExecution().completed(msg->getId(), true, true);
        cluster[i]->close();
    }    
}

QPID_AUTO_TEST_SUITE_END()