summaryrefslogtreecommitdiff
path: root/src/components/transport_manager/src/android/local_socket_sender.cc
blob: 409f84479ec90a11180282ed1443386ede08d959 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include "transport_manager/android/local_socket_sender.h"

#include <thread>
#include <utility>
#include "utils/logger.h"

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <atomic>

SDL_CREATE_LOG_VARIABLE("local_socket_sender")

namespace transport_manager {
namespace transport_adapter {

LocalSocketSender::LocalSocketSender(OnDataSentCallback&& sent_callback, OnConnectedCallback&& connected_callback)
    : socket_id_(-1)
    , connected_(false)
    , message_queue_()
    , connected_callback_(connected_callback)
    , sent_callback_(sent_callback)
{}

void LocalSocketSender::Init(const std::string& socket_name)
{
    SDL_LOG_AUTO_TRACE();

    struct sockaddr_un addr{};
    addr.sun_family = AF_LOCAL;
    addr.sun_path[0] = '\0';
    strcpy(&addr.sun_path[1], socket_name.c_str() );
    socklen_t len = offsetof(struct sockaddr_un, sun_path) + 1 + strlen(&addr.sun_path[1]);
    int err;

    SDL_LOG_DEBUG("Creating local socket...");
    socket_id_ = socket(PF_LOCAL, SOCK_STREAM, 0);
    if (socket_id_ < 0) {
        err = errno;
        SDL_LOG_ERROR("Cannot open socket: " << strerror(err));
        return;
    }

    if (!TryToConnect(addr, len)) {
        SDL_LOG_ERROR("Failed to connect");
        close(socket_id_);
    } else {
        connected_ = true;
    }

    connected_callback_(connected_);
}

void LocalSocketSender::Run()
{
    SDL_LOG_AUTO_TRACE();
    while(connected_ && !message_queue_.IsShuttingDown()) {
        if (message_queue_.empty()) {
            message_queue_.wait();
        }

        protocol_handler::RawMessagePtr message;
        if (message_queue_.pop(message)) {
            int n = write(socket_id_, message->data(), message->data_size());
            if (n > 0) {
                sent_callback_(message);
            } else {
                sent_callback_(nullptr);
            }
        }
    }
}

LocalSocketSender::~LocalSocketSender()
{
    if(connected_){
        close(socket_id_);
    }
}

bool LocalSocketSender::TryToConnect(sockaddr_un& addr, socklen_t len) {
    const int connect_attempts = 10;
    const int attempt_interval_ms = 500;

    for (int i = 0; i < connect_attempts; ++i) {
        SDL_LOG_DEBUG("Attempt #" << i + 1 << " to connect to socket...");

        if (connect(socket_id_, (struct sockaddr *) &addr, len) < 0) {
            SDL_LOG_ERROR("Connect() failed: " << strerror(errno)
              << ". Retry in " << attempt_interval_ms << "ms");
            usleep(attempt_interval_ms);
            continue;
        }

        SDL_LOG_DEBUG("Successfully connected to socket");
        return true;
    }

    SDL_LOG_ERROR("Connection attempts exceeded. Can't connect to socket");
    return false;
}

void LocalSocketSender::Stop() {
    SDL_LOG_DEBUG("Requesting client to stop");
    message_queue_.Shutdown();
}

void LocalSocketSender::Send(::protocol_handler::RawMessagePtr message) {
    message_queue_.push(message);
}

}  // namespace transport_adapter
}  // namespace transport_manager