Fix reading of output from subprocess

This commit is contained in:
TheAssassin 2022-07-15 17:01:48 +02:00
parent 649fc0247d
commit 4677fd9280
5 changed files with 112 additions and 89 deletions

View File

@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.2)
project(linuxdeploy C CXX) project(linuxdeploy C CXX)
set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PROJECT_SOURCE_DIR}/cmake/Modules/") set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PROJECT_SOURCE_DIR}/cmake/Modules/")

View File

@ -4,15 +4,24 @@
#include <set> #include <set>
#include <vector> #include <vector>
#include <poll.h> #include <poll.h>
#include <chrono>
/** /**
* Reads from a pipe when data is available, and hands data to registered callbacks. * Reads from a pipe when data is available, and hands data to registered callbacks.
*/ */
class pipe_reader { class pipe_reader {
private: private:
const int pipe_fd_; struct pollfd pollfd_;
public: public:
static constexpr std::chrono::milliseconds READ_TIMEOUT{50};
enum class result {
SUCCESS = 0,
TIMEOUT,
END_OF_FILE,
};
/** /**
* Construct new instance from pipe file descriptor. * Construct new instance from pipe file descriptor.
* @param pipe_fd file descriptor for pipe we will read from (e.g., a subprocess's stdout, stderr pipes) * @param pipe_fd file descriptor for pipe we will read from (e.g., a subprocess's stdout, stderr pipes)
@ -27,10 +36,12 @@ public:
* - no more data left in the pipe to be read * - no more data left in the pipe to be read
* - buffer is completely filled * - buffer is completely filled
* *
* The buffer will be resized to the number of bytes read from the pipe.
*
* On errors, a subprocess_error is thrown. * On errors, a subprocess_error is thrown.
* *
* @param buffer buffer to store read data into * @param buffer buffer to store read data into
* @returns amount of characters read from the pipe * @returns
*/ */
size_t read(std::vector<std::string::value_type>& buffer) const; result read(std::vector<std::string::value_type>& buffer, std::chrono::milliseconds read_timeout = READ_TIMEOUT);
}; };

View File

@ -68,65 +68,59 @@ namespace linuxdeploy {
subprocess::subprocess_result_buffer_t intermediate_buffer(4096); subprocess::subprocess_result_buffer_t intermediate_buffer(4096);
// (try to) read from pipe // (try to) read from pipe
const auto bytes_read = pipe_to_be_logged.reader_.read(intermediate_buffer); switch (pipe_to_be_logged.reader_.read(intermediate_buffer)) {
case pipe_reader::result::SUCCESS: {
// all we have to do now is to look for CR or LF, send everything up to that location into the ldLog instance,
// write our prefix and then repeat
for (auto it = intermediate_buffer.begin(); it != intermediate_buffer.end(); ++it) {
if (pipe_to_be_logged.print_prefix_in_next_iteration_) {
pipe_to_be_logged.log_ << log_prefix;
}
// 0 means EOF const auto next_lf = std::find(it, intermediate_buffer.end(), '\n');
if (bytes_read == 0) { const auto next_cr = std::find(it, intermediate_buffer.end(), '\r');
pipe_to_be_logged.eof = true;
break;
}
// we just trim the buffer to the bytes we read (makes the code below easier) // we don't care which one goes first -- we pick the closest one, write everything up to it into our ldLog,
intermediate_buffer.resize(bytes_read); // then print our prefix and repeat that until there's nothing left in our buffer
auto next_control_char = std::min({next_lf, next_cr});
// all we have to do now is to look for CR or LF, send everything up to that location into the ldLog instance, // if there is a control char, we remember this for the next iteration, where we print our
// write our prefix and then repeat // log prefix
for (auto it = intermediate_buffer.begin(); it != intermediate_buffer.end(); ++it) { // in any case, we can write the remaining buffer contents into the ldLog object
if (pipe_to_be_logged.print_prefix_in_next_iteration_) { pipe_to_be_logged.print_prefix_in_next_iteration_ = (next_control_char !=
pipe_to_be_logged.log_ << log_prefix; intermediate_buffer.end());
}
const auto next_lf = std::find(it, intermediate_buffer.end(), '\n'); const auto distance_from_begin_to_it = std::distance(intermediate_buffer.begin(), it);
const auto next_cr = std::find(it, intermediate_buffer.end(), '\r'); auto distance_from_it_to_next_cc = std::distance(it, next_control_char);
// we don't care which one goes first -- we pick the closest one, write everything up to it into our ldLog, if (pipe_to_be_logged.print_prefix_in_next_iteration_) {
// then print our prefix and repeat that until there's nothing left in our buffer distance_from_it_to_next_cc++;
auto next_control_char = std::min({next_lf, next_cr}); }
// if there is a control char, we remember this for the next iteration, where we print our // need to make sure we include the control char in the write
// log prefix pipe_to_be_logged.log_.write(
// in any case, we can write the remaining buffer contents into the ldLog object intermediate_buffer.data() + distance_from_begin_to_it,
pipe_to_be_logged.print_prefix_in_next_iteration_ = (next_control_char != distance_from_it_to_next_cc
intermediate_buffer.end()); );
const auto distance_from_begin_to_it = std::distance(intermediate_buffer.begin(), it); it = next_control_char;
auto distance_from_it_to_next_cc = std::distance(it, next_control_char);
if (pipe_to_be_logged.print_prefix_in_next_iteration_) { // TODO: should not be necessary, should be fixed in for loop
distance_from_it_to_next_cc++; if (!pipe_to_be_logged.print_prefix_in_next_iteration_) {
} break;
}
// need to make sure we include the control char in the write }
pipe_to_be_logged.log_.write(
intermediate_buffer.data() + distance_from_begin_to_it,
distance_from_it_to_next_cc
);
it = next_control_char;
// TODO: should not be necessary, should be fixed in for loop
if (!pipe_to_be_logged.print_prefix_in_next_iteration_) {
break; break;
} }
case pipe_reader::result::END_OF_FILE: {
pipe_to_be_logged.eof = true;
break;
}
case pipe_reader::result::TIMEOUT:
break;
} }
} }
// do-while might be a little more elegant, but we can save this one unnecessary sleep, so...
if (proc.is_running()) {
// reduce load on CPU
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
// once all buffers are EOF, we can stop reading // once all buffers are EOF, we can stop reading
if (std::all_of(pipes_to_be_logged.begin(), pipes_to_be_logged.end(), [](const pipe_to_be_logged& pipe_state) { if (std::all_of(pipes_to_be_logged.begin(), pipes_to_be_logged.end(), [](const pipe_to_be_logged& pipe_state) {
return pipe_state.eof; return pipe_state.eof;

View File

@ -1,37 +1,59 @@
// system headers // system headers
#include <algorithm> #include <algorithm>
#include <fcntl.h>
#include <unistd.h>
#include <functional>
#include <cstring> #include <cstring>
#include <poll.h>
#include <unistd.h>
#include <stdexcept> #include <stdexcept>
// local headers // local headers
#include "linuxdeploy/subprocess/pipe_reader.h" #include "linuxdeploy/subprocess/pipe_reader.h"
pipe_reader::pipe_reader(int pipe_fd) : pipe_fd_(pipe_fd) { pipe_reader::pipe_reader(int pipe_fd) : pollfd_(pollfd{pipe_fd, POLLIN | POLLHUP}) {}
// add O_NONBLOCK TO fd's flags to be able to read
auto flags = fcntl(pipe_fd_, F_GETFL, 0);
flags |= O_NONBLOCK;
fcntl(pipe_fd_, F_SETFL, flags);
}
size_t pipe_reader::read(std::vector<std::string::value_type>& buffer) const { pipe_reader::result pipe_reader::read(std::vector<std::string::value_type>& buffer, std::chrono::milliseconds read_timeout) {
for (;;) { const auto timeout_msec = std::chrono::duration_cast<std::chrono::milliseconds>(read_timeout).count();
ssize_t rv = ::read(pipe_fd_, buffer.data(), buffer.size());
if (rv == -1) { // we could (and probably should) be using poll on multiple fds at once
switch (errno) { // however given the low bandwidth of data to handle, this should be fine, given we use a small-enough timeout
// retry in case data is currently not available // also the read buffer sizes could be further increased to improve the overall performance
case EINTR: switch (poll(&pollfd_, 1, static_cast<int>(timeout_msec))) {
case EAGAIN: case -1:
continue; // TODO: introduce custom subprocess_error
default: throw std::runtime_error{"unexpected error reading from pipe: " + std::string(strerror(errno))};
// TODO: introduce custom subprocess_error case 0:
throw std::runtime_error{"unexpected error reading from pipe: " + std::string(strerror(errno))}; return result::TIMEOUT;
case 1: {
if ((pollfd_.revents & POLLIN) != 0) {
ssize_t rv = ::read(pollfd_.fd, buffer.data(), buffer.size());
switch (rv) {
case -1: {
throw std::runtime_error{"unexpected error reading from pipe: " + std::string(strerror(errno))};
}
case 0: {
return result::END_OF_FILE;
}
default: {
// set the size correctly so the caller can just query the vector's size if the number of read chars is needed
buffer.resize(rv);
return result::SUCCESS;
}
}
} }
}
return rv; if ((pollfd_.revents & POLLHUP) != 0) {
// appears like this can be considered eof
return result::END_OF_FILE;
}
if ((pollfd_.revents & POLLERR) != 0 || (pollfd_.revents & POLLNVAL) != 0) {
throw std::runtime_error{"poll() failed unexpectedly"};
}
break;
}
default:
// this is a should-never-ever-happen case, a return value not handled by the lines above is actually not possible
throw std::runtime_error{"unexpected return value from pollfd"};
} }
} }

View File

@ -51,23 +51,19 @@ namespace linuxdeploy {
subprocess_result_buffer_t intermediate_buffer(4096); subprocess_result_buffer_t intermediate_buffer(4096);
// (try to) read all available data from pipe // (try to) read all available data from pipe
for (;;) { for (; !pipe_state.eof; ) {
if (pipe_state.eof) { switch (pipe_state.reader.read(intermediate_buffer)) {
break; case pipe_reader::result::SUCCESS: {
// append to main buffer
pipe_state.buffer.reserve(pipe_state.buffer.size() + intermediate_buffer.size());
std::copy(intermediate_buffer.begin(), intermediate_buffer.end(), std::back_inserter(pipe_state.buffer));
}
case pipe_reader::result::END_OF_FILE: {
pipe_state.eof = true;
}
default:
break;
} }
const auto bytes_read = pipe_state.reader.read(intermediate_buffer);
// 0 means EOF
if (bytes_read == 0) {
pipe_state.eof = true;
break;
}
// append to main buffer
pipe_state.buffer.reserve(pipe_state.buffer.size() + bytes_read);
std::copy(intermediate_buffer.begin(), (intermediate_buffer.begin() + bytes_read),
std::back_inserter(pipe_state.buffer));
} }
} }