summaryrefslogtreecommitdiff
path: root/src/components/media_manager/src/socket_streamer_adapter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/media_manager/src/socket_streamer_adapter.cc')
-rw-r--r--src/components/media_manager/src/socket_streamer_adapter.cc299
1 files changed, 70 insertions, 229 deletions
diff --git a/src/components/media_manager/src/socket_streamer_adapter.cc b/src/components/media_manager/src/socket_streamer_adapter.cc
index 45b1f63a17..ecf6535958 100644
--- a/src/components/media_manager/src/socket_streamer_adapter.cc
+++ b/src/components/media_manager/src/socket_streamer_adapter.cc
@@ -33,282 +33,123 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/select.h>
-#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
-#include <string.h>
-#include <errno.h>
-#include "config_profile/profile.h"
-#include "media_manager/video/socket_video_streamer_adapter.h"
#include "utils/logger.h"
+#include "media_manager/socket_streamer_adapter.h"
namespace media_manager {
CREATE_LOGGERPTR_GLOBAL(logger, "SocketStreamerAdapter")
-SocketStreamerAdapter::SocketStreamerAdapter()
- : socket_fd_(0),
- is_ready_(false),
- streamer_(new Streamer(this)),
- thread_(threads::CreateThread("SocketStreamer", streamer_)),
- messages_() {
+SocketStreamerAdapter::SocketStreamerAdapter(
+ const std::string& ip,
+ int32_t port,
+ const std::string& header)
+ : StreamerAdapter(new SocketStreamer(this, ip, port, header)) {
}
SocketStreamerAdapter::~SocketStreamerAdapter() {
- LOG4CXX_AUTO_TRACE(logger);
- thread_->join();
- delete streamer_;
- threads::DeleteThread(thread_);
-}
-
-void SocketStreamerAdapter::StartActivity(int32_t application_key) {
- LOG4CXX_TRACE(logger, "enter " << application_key);
-
- if (application_key == current_application_) {
- LOG4CXX_INFO(logger, "Already running for app " << application_key);
- } else {
- is_ready_ = true;
- current_application_ = application_key;
-
- messages_.Reset();
-
- for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
- media_listeners_.end() != it;
- ++it) {
- (*it)->OnActivityStarted(application_key);
- }
- }
- LOG4CXX_TRACE(logger, "exit");
-}
-
-void SocketStreamerAdapter::StopActivity(int32_t application_key) {
- LOG4CXX_TRACE(logger, "enter " << application_key);
-
- if (application_key != current_application_) {
- LOG4CXX_WARN(logger, "Streaming is not active for " << application_key);
- } else {
- is_ready_ = false;
- current_application_ = 0;
-
- if (streamer_) {
- streamer_->stop();
- messages_.Shutdown();
- }
-
- for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
- media_listeners_.end() != it;
- ++it) {
- (*it)->OnActivityEnded(application_key);
- }
- }
- LOG4CXX_TRACE(logger, "exit");
}
-bool SocketStreamerAdapter::is_app_performing_activity(
- int32_t application_key) {
- return (application_key == current_application_);
+SocketStreamerAdapter::SocketStreamer::SocketStreamer(
+ SocketStreamerAdapter* const adapter,
+ const std::string& ip,
+ int32_t port,
+ const std::string& header)
+ : Streamer(adapter),
+ ip_(ip),
+ port_(port),
+ header_(header),
+ socket_fd_(0),
+ send_socket_fd_(0),
+ is_first_frame_(true) {
}
-void SocketStreamerAdapter::Init() {
- LOG4CXX_DEBUG(logger, "Start sending thread");
- const size_t kStackSize = 16384;
- thread_->start(threads::ThreadOptions(kStackSize));
+SocketStreamerAdapter::SocketStreamer::~SocketStreamer() {
}
-void SocketStreamerAdapter::SendData(
- int32_t application_key,
- const ::protocol_handler::RawMessagePtr message) {
- LOG4CXX_INFO(logger, "SendData(application_key = " << application_key << ")");
-
-
- if (application_key != current_application_) {
- LOG4CXX_WARN(logger, "Currently working with other app "
- << current_application_);
- return;
- }
-
- if (is_ready_) {
- messages_.push(message);
- }
-}
-
-SocketStreamerAdapter::Streamer::Streamer(
- SocketStreamerAdapter* const server)
- : server_(server),
- new_socket_fd_(0),
- is_first_loop_(true),
- is_client_connected_(false),
- stop_flag_(false) {
-}
-
-SocketStreamerAdapter::Streamer::~Streamer() {
- stop();
-}
-
-void SocketStreamerAdapter::Streamer::threadMain() {
- LOG4CXX_TRACE(logger,"enter " << this);
- sync_primitives::AutoLock auto_lock(thread_lock);
- start();
-
- while (!stop_flag_) {
- new_socket_fd_ = accept(server_->socket_fd_, NULL, NULL);
- LOG4CXX_INFO(logger, "Client connectd " << new_socket_fd_);
- if (0 > new_socket_fd_) {
- LOG4CXX_ERROR(logger, "Socket is closed " << strerror(errno));
- sleep(1);
- continue;
- }
-
- is_client_connected_ = true;
- is_first_loop_ = true;
- while (is_client_connected_) {
- while (!server_->messages_.empty()) {
- ::protocol_handler::RawMessagePtr msg = server_->messages_.pop();
- if (!msg) {
- LOG4CXX_ERROR(logger, "Null pointer message");
- continue;
- }
-
- is_client_connected_ = send(msg);
- static int32_t messages_for_session = 0;
- ++messages_for_session;
-
- LOG4CXX_INFO(logger, "Handling map streaming message. This is "
- << messages_for_session << " the message for "
- << server_->current_application_);
- std::set<MediaListenerPtr>::iterator it = server_->media_listeners_
- .begin();
- for (; server_->media_listeners_.end() != it; ++it) {
- (*it)->OnDataReceived(server_->current_application_,
- messages_for_session);
- }
- }
-
- if (!is_ready()) {
- LOG4CXX_INFO(logger, "Client disconnected.");
- stop();
- break;
- }
- server_->messages_.wait();
- }
- }
- LOG4CXX_TRACE(logger,"exit " << this);
-}
-
-void SocketStreamerAdapter::Streamer::exitThreadMain() {
- LOG4CXX_TRACE(logger,"enter " << this);
- stop_flag_ = true;
- stop();
- server_->messages_.Shutdown();
- if (server_->socket_fd_ != -1) {
- shutdown(server_->socket_fd_, SHUT_RDWR);
- close(server_->socket_fd_);
- }
- LOG4CXX_TRACE(logger,"exit " << this);
-}
-
-void SocketStreamerAdapter::Streamer::start() {
- server_->socket_fd_ = socket(AF_INET, SOCK_STREAM, 0);
-
- if (0 >= server_->socket_fd_) {
- LOG4CXX_ERROR_EXT(logger, "Server open error");
- return;
+bool SocketStreamerAdapter::SocketStreamer::Connect() {
+ LOG4CXX_AUTO_TRACE(logger);
+ socket_fd_ = socket(AF_INET, SOCK_STREAM, 0);
+ if (0 >= socket_fd_) {
+ LOG4CXX_ERROR(logger, "Unable to create socket");
+ return false;
}
int32_t optval = 1;
- if (-1 == setsockopt(server_->socket_fd_, SOL_SOCKET, SO_REUSEADDR,
+ if (-1 == setsockopt(socket_fd_, SOL_SOCKET, SO_REUSEADDR,
&optval, sizeof optval)) {
- LOG4CXX_ERROR_EXT(logger, "Unable to set sockopt");
- return;
+ LOG4CXX_ERROR(logger, "Unable to set sockopt");
+ return false;
}
struct sockaddr_in serv_addr_ = { 0 };
- serv_addr_.sin_addr.s_addr = inet_addr(server_->ip_.c_str());
+ serv_addr_.sin_addr.s_addr = inet_addr(ip_.c_str());
serv_addr_.sin_family = AF_INET;
- serv_addr_.sin_port = htons(server_->port_);
-
- if (-1 == bind(server_->socket_fd_,
+ serv_addr_.sin_port = htons(port_);
+ if (-1 == bind(socket_fd_,
reinterpret_cast<struct sockaddr*>(&serv_addr_),
sizeof(serv_addr_))) {
- LOG4CXX_ERROR_EXT(logger, "Unable to bind");
- return;
+ LOG4CXX_ERROR(logger, "Unable to bind");
+ return false;
}
- LOG4CXX_INFO(logger, "SocketStreamerAdapter::listen for connections");
- if (-1 == listen(server_->socket_fd_, 5)) {
- LOG4CXX_ERROR_EXT(logger, "Unable to listen");
- return;
+ if (-1 == listen(socket_fd_, 5)) {
+ LOG4CXX_ERROR(logger, "Unable to listen");
+ return false;
}
-}
-void SocketStreamerAdapter::Streamer::stop() {
- LOG4CXX_TRACE(logger,"enter " << this);
- if (0 == new_socket_fd_) {
- LOG4CXX_ERROR(logger, "Client Socket does not exist: ");
- } else if (-1 == shutdown(new_socket_fd_, SHUT_RDWR)) {
- LOG4CXX_ERROR(logger, "Unable to shutdown socket " << strerror(errno));
- } else if (-1 == ::close(new_socket_fd_)) {
- LOG4CXX_ERROR(logger, "Unable to close socket " << strerror(errno));
+ send_socket_fd_ = accept(socket_fd_, NULL, NULL);
+ if (0 >= send_socket_fd_) {
+ LOG4CXX_ERROR(logger, "Unable to accept");
+ return false;
}
- new_socket_fd_ = 0;
- is_client_connected_ = false;
- LOG4CXX_TRACE(logger,"exit" << this);
+ is_first_frame_ = true;
+ LOG4CXX_INFO(logger, "Client connected: " << send_socket_fd_);
+ return true;
}
-bool SocketStreamerAdapter::Streamer::is_ready() const {
- bool result = true;
- fd_set fds;
- FD_ZERO(&fds);
- FD_SET(new_socket_fd_, &fds);
- struct timeval tv;
- tv.tv_sec = 5; // set a 5 second timeout
- tv.tv_usec = 0;
-
- int32_t retval = 0;
- retval = select(new_socket_fd_ + 1, 0, &fds, 0, &tv);
-
- if (-1 == retval) {
- LOG4CXX_ERROR_EXT(logger, "An error occurred");
- result = false;
- } else if (0 == retval) {
- LOG4CXX_ERROR_EXT(logger, "The timeout expired");
- result = false;
+void SocketStreamerAdapter::SocketStreamer::Disconnect() {
+ LOG4CXX_AUTO_TRACE(logger);
+ if (0 < send_socket_fd_) {
+ close(send_socket_fd_);
}
- return result;
-}
-
-bool SocketStreamerAdapter::Streamer::send(
- const ::protocol_handler::RawMessagePtr msg) {
- if (!is_ready()) {
- LOG4CXX_ERROR_EXT(logger, " Socket is not ready");
- return false;
+ if (0 < socket_fd_) {
+ close(socket_fd_);
}
+}
- if (is_first_loop_) {
- is_first_loop_ = false;
- char hdr[] = {"HTTP/1.1 200 OK\r\n "
- "Connection: Keep-Alive\r\n"
- "Keep-Alive: timeout=15, max=300\r\n"
- "Server: SDL\r\n"
- "Content-Type: video/mp4\r\n\r\n"
- };
-
- if (-1 == ::send(new_socket_fd_, hdr, strlen(hdr), MSG_NOSIGNAL)) {
- LOG4CXX_ERROR_EXT(logger, " Unable to send");
+bool SocketStreamerAdapter::SocketStreamer::Send(
+ protocol_handler::RawMessagePtr msg) {
+ LOG4CXX_AUTO_TRACE(logger);
+ ssize_t ret;
+ if (is_first_frame_) {
+ ret = send(send_socket_fd_, header_.c_str(),
+ header_.size(), MSG_NOSIGNAL);
+ if (static_cast<uint32_t>(ret) != header_.size()) {
+ LOG4CXX_ERROR(logger, "Unable to send data to socket");
return false;
}
+ is_first_frame_ = false;
}
- if (-1 == ::send(new_socket_fd_, (*msg).data(),
- (*msg).data_size(), MSG_NOSIGNAL)) {
- LOG4CXX_ERROR_EXT(logger, " Unable to send");
+ ret = send(send_socket_fd_, msg->data(),
+ msg->data_size(), MSG_NOSIGNAL);
+ if (-1 == ret) {
+ LOG4CXX_ERROR(logger, "Unable to send data to socket");
return false;
}
- LOG4CXX_INFO(logger, "Streamer::sent " << (*msg).data_size());
+ if (static_cast<uint32_t>(ret) != msg->data_size()) {
+ LOG4CXX_WARN(logger, "Couldn't send all the data to socket "
+ << send_socket_fd_);
+ }
+
+ LOG4CXX_INFO(logger, "Streamer::sent " << msg->data_size());
return true;
}