summaryrefslogtreecommitdiff
path: root/src/components/media_manager/src/pipe_streamer_adapter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/media_manager/src/pipe_streamer_adapter.cc')
-rw-r--r--src/components/media_manager/src/pipe_streamer_adapter.cc67
1 files changed, 51 insertions, 16 deletions
diff --git a/src/components/media_manager/src/pipe_streamer_adapter.cc b/src/components/media_manager/src/pipe_streamer_adapter.cc
index 8bf14a546e..bda106721f 100644
--- a/src/components/media_manager/src/pipe_streamer_adapter.cc
+++ b/src/components/media_manager/src/pipe_streamer_adapter.cc
@@ -30,13 +30,13 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
+#include "media_manager/pipe_streamer_adapter.h"
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
-#include "utils/logger.h"
#include "utils/file_system.h"
-#include "media_manager/pipe_streamer_adapter.h"
+#include "utils/logger.h"
namespace media_manager {
@@ -82,15 +82,15 @@ PipeStreamerAdapter::PipeStreamer::~PipeStreamer() {
bool PipeStreamerAdapter::PipeStreamer::Connect() {
LOG4CXX_AUTO_TRACE(logger_);
- pipe_fd_ = open(named_pipe_path_.c_str(), O_RDWR, 0);
+ pipe_fd_ = open(named_pipe_path_.c_str(), O_RDWR | O_NONBLOCK, 0);
if (-1 == pipe_fd_) {
LOG4CXX_ERROR(logger_, "Cannot open pipe for writing " << named_pipe_path_);
return false;
}
- LOG4CXX_INFO(logger_,
- "Pipe " << named_pipe_path_
- << " was successfuly opened for writing");
+ LOG4CXX_INFO(
+ logger_,
+ "Pipe " << named_pipe_path_ << " was successfuly opened for writing");
return true;
}
@@ -106,16 +106,51 @@ void PipeStreamerAdapter::PipeStreamer::Disconnect() {
bool PipeStreamerAdapter::PipeStreamer::Send(
protocol_handler::RawMessagePtr msg) {
LOG4CXX_AUTO_TRACE(logger_);
- ssize_t ret = write(pipe_fd_, msg->data(), msg->data_size());
- if (-1 == ret) {
- LOG4CXX_ERROR(logger_, "Failed writing data to pipe " << named_pipe_path_);
- return false;
- }
-
- if (static_cast<uint32_t>(ret) != msg->data_size()) {
- LOG4CXX_WARN(logger_,
- "Couldn't write all the data to pipe " << named_pipe_path_);
- }
+ fd_set wfds;
+ FD_ZERO(&wfds);
+ FD_SET(pipe_fd_, &wfds);
+ struct timeval tv;
+ tv.tv_sec = 10;
+ tv.tv_usec = 0;
+ ssize_t write_ret = 0;
+ bool data_remaining = false;
+ do {
+ int select_ret = select(pipe_fd_ + 1, NULL, &wfds, NULL, &tv);
+ // Most likely pipe closed, fail stream
+ if (select_ret == -1) {
+ LOG4CXX_ERROR(logger_,
+ "Failed writing data to pipe "
+ << named_pipe_path_ << ". Errno: " << strerror(errno));
+ return false;
+ // Select success, attempt to write
+ } else if (select_ret) {
+ ssize_t temp_ret = write(
+ pipe_fd_, msg->data() + write_ret, msg->data_size() - write_ret);
+ if (-1 == temp_ret) {
+ LOG4CXX_ERROR(logger_,
+ "Failed writing data to pipe "
+ << named_pipe_path_
+ << ". Errno: " << strerror(errno));
+ return false;
+ }
+ write_ret += temp_ret;
+ // Select timed out, fail stream.
+ } else {
+ LOG4CXX_ERROR(logger_,
+ "Failed writing data to pipe " << named_pipe_path_
+ << ". Error: TIMEOUT");
+ return false;
+ }
+ // Check that all data was written to the pipe.
+ data_remaining = static_cast<uint32_t>(write_ret) != msg->data_size();
+ if (data_remaining) {
+ LOG4CXX_WARN(logger_,
+ "Couldn't write all the data to pipe "
+ << named_pipe_path_ << ". "
+ << msg->data_size() - write_ret << " bytes remaining");
+ }
+ // Loop to send remaining data if there is any.
+ } while (data_remaining);
LOG4CXX_INFO(logger_, "Streamer::sent " << msg->data_size());
return true;