From e9bbbc80f45d15bbb2fcd741b693d08887fdd04c Mon Sep 17 00:00:00 2001 From: TheAssassin Date: Mon, 31 Aug 2020 08:39:43 +0200 Subject: [PATCH] Switch to new subprocess lib in type 0 plugins, mk. 2 Apparently fixes all buffer-related crashes, but there's a few minor bugs left. CC #143 --- include/linuxdeploy/plugin/base_impl.h | 136 +----------------- .../plugin/plugin_process_handler.h | 19 +++ src/plugin/CMakeLists.txt | 20 ++- src/plugin/plugin_process_handler.cpp | 111 ++++++++++++++ 4 files changed, 151 insertions(+), 135 deletions(-) create mode 100644 include/linuxdeploy/plugin/plugin_process_handler.h create mode 100644 src/plugin/plugin_process_handler.cpp diff --git a/include/linuxdeploy/plugin/base_impl.h b/include/linuxdeploy/plugin/base_impl.h index 0c8c221..e4507a8 100644 --- a/include/linuxdeploy/plugin/base_impl.h +++ b/include/linuxdeploy/plugin/base_impl.h @@ -14,6 +14,7 @@ #include "linuxdeploy/core/log.h" #include "linuxdeploy/util/util.h" #include "linuxdeploy/subprocess/process.h" +#include "linuxdeploy/plugin/plugin_process_handler.h" #pragma once @@ -135,139 +136,8 @@ namespace linuxdeploy { template int PluginBase::run(const boost::filesystem::path& appDirPath) { - const auto pluginPath = path(); - const std::initializer_list args = {pluginPath.string(), "--appdir", appDirPath.string()}; - - auto log = ldLog(); - log << "Running process:"; - for (const auto& arg : args) { - log << "" << arg; - } - log << std::endl; - - subprocess::subprocess_env_map_t environmentVariables{}; - - if (this->pluginType() == PLUGIN_TYPE::INPUT_TYPE) { - // add $LINUXDEPLOY, which points to the current binary - // we do not need to pass $APPIMAGE or alike, since while linuxdeploy is running, the path in the - // temporary mountpoint of its AppImage will be valid anyway - environmentVariables["LINUXDEPLOY"] = linuxdeploy::util::getOwnExecutablePath(); - } - - subprocess::process process(args, environmentVariables); - - std::vector pfds(2); - auto* opfd = &pfds[0]; - auto* epfd = &pfds[1]; - - opfd->fd = process.stdout_fd(); - opfd->events = POLLIN; - - epfd->fd = process.stderr_fd(); - epfd->events = POLLIN; - - for (const auto& fd : {process.stdout_fd(), process.stderr_fd()}) { - auto flags = fcntl(fd, F_GETFL, 0); - flags |= O_NONBLOCK; - fcntl(fd, F_SETFL, flags); - } - - auto printOutput = [&pfds, opfd, epfd, this, &process]() { - poll(pfds.data(), pfds.size(), -1); - - auto printUntilLastLine = [this](std::vector& buf, size_t& bufSize, const std::string& streamType) { - std::ostringstream oss; - - while (true) { - const auto firstLineFeed = std::find(buf.begin(), buf.end(), '\n'); - const auto firstCarriageReturn = std::find(buf.begin(), buf.end(), '\r'); - - if (firstLineFeed == buf.end() && firstCarriageReturn == buf.end()) - break; - - const auto endOfLine = std::min(firstLineFeed, firstCarriageReturn); - - std::string line(buf.begin(), endOfLine+1); - - oss << "[" << d->name << "/" << streamType << "] " << line; - - bufSize -= std::distance(buf.begin(), endOfLine+1); - buf.erase(buf.begin(), endOfLine+1); - } - - auto messages = oss.str(); - if (!messages.empty()) - ldLog() << messages; - }; - - std::vector stdoutBuf(16 * 1024); - std::vector stderrBuf(16 * 1024); - size_t currentStdoutBufSize = 0; - size_t currentStderrBufSize = 0; - - if ((opfd->revents & POLLIN) != 0) { - if (currentStdoutBufSize >= stdoutBuf.size()) - throw std::runtime_error("Buffer overflow"); - - while (true) { - auto bytesRead = read( - process.stdout_fd(), - reinterpret_cast(stdoutBuf.data() + currentStdoutBufSize), - static_cast(stdoutBuf.size() - currentStdoutBufSize) - ); - - if (bytesRead == 0) - break; - - currentStdoutBufSize += bytesRead; - - printUntilLastLine(stdoutBuf, currentStdoutBufSize, "stdout"); - } - } - - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - if ((epfd->revents & POLLIN) != 0) { - if (currentStderrBufSize >= stderrBuf.size()) - throw std::runtime_error("Buffer overflow"); - - while (true) { - auto bytesRead = read( - process.stderr_fd(), - reinterpret_cast(stderrBuf.data() + currentStderrBufSize), - static_cast(stderrBuf.size() - currentStderrBufSize) - ); - - if (bytesRead == 0) - break; - - currentStderrBufSize += bytesRead; - - printUntilLastLine(stderrBuf, currentStderrBufSize, "stderr"); - } - } - - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - return true; - }; - - do { - if (!printOutput()) { - ldLog() << LD_ERROR << "Failed to communicate with process" << std::endl; - process.kill(); - return -1; - } - } while (process.is_running()); - - if (!printOutput()) { - ldLog() << LD_ERROR << "Failed to communicate with process" << std::endl; - process.kill(); - return -1; - } - - const auto retcode = process.close(); - return retcode; + plugin_process_handler handler(d->name, path()); + return handler.run(appDirPath); } } } diff --git a/include/linuxdeploy/plugin/plugin_process_handler.h b/include/linuxdeploy/plugin/plugin_process_handler.h new file mode 100644 index 0000000..dcef96b --- /dev/null +++ b/include/linuxdeploy/plugin/plugin_process_handler.h @@ -0,0 +1,19 @@ +#pragma once + +// library headers +#include + +namespace linuxdeploy { + namespace plugin { + class plugin_process_handler { + private: + std::string name_; + boost::filesystem::path path_; + + public: + plugin_process_handler(std::string name, boost::filesystem::path path); + + int run(const boost::filesystem::path& appDir) const; + }; + } +} diff --git a/src/plugin/CMakeLists.txt b/src/plugin/CMakeLists.txt index 7836f1e..0ccd583 100644 --- a/src/plugin/CMakeLists.txt +++ b/src/plugin/CMakeLists.txt @@ -1,4 +1,20 @@ -file(GLOB PLUGIN_HEADERS ${PROJECT_SOURCE_DIR}/include/linuxdeploy/plugin/*.h) +set(headers_dir ${PROJECT_SOURCE_DIR}/include/linuxdeploy/plugin/) -add_library(linuxdeploy_plugin STATIC plugin_type0.cpp plugin.cpp ${PLUGIN_HEADERS}) +set(headers + ${headers_dir}/plugin.h + ${headers_dir}/base.h + ${headers_dir}/base_impl.h + ${headers_dir}/exceptions.h + ${headers_dir}/plugin_process_handler.h +) + +add_library(linuxdeploy_plugin STATIC + plugin.cpp + plugin_type0.cpp + plugin_process_handler.cpp + ${headers} +) target_link_libraries(linuxdeploy_plugin PUBLIC linuxdeploy_core ${BOOST_LIBS} linuxdeploy_subprocess) + +unset(headers) +unset(headers_dir) diff --git a/src/plugin/plugin_process_handler.cpp b/src/plugin/plugin_process_handler.cpp new file mode 100644 index 0000000..4825ca4 --- /dev/null +++ b/src/plugin/plugin_process_handler.cpp @@ -0,0 +1,111 @@ +// system headers +#include +#include + +// local headers +#include +#include +#include +#include +#include + +namespace bf = boost::filesystem; + +namespace linuxdeploy { + namespace plugin { + using namespace core::log; + + plugin_process_handler::plugin_process_handler(std::string name, bf::path path) : name_(std::move(name)), path_(std::move(path)) {} + + int plugin_process_handler::run(const bf::path& appDir) const { + // prepare arguments and environment variables + const std::initializer_list args = {path_.string(), "--appdir", appDir.string()}; + + subprocess::subprocess_env_map_t environmentVariables{}; + + // add $LINUXDEPLOY, which points to the current binary + // we do not need to pass $APPIMAGE or alike, since while linuxdeploy is running, the path in the + // temporary mountpoint of its AppImage will be valid anyway + environmentVariables["LINUXDEPLOY"] = linuxdeploy::util::getOwnExecutablePath(); + + linuxdeploy::subprocess::process proc{args, environmentVariables}; + + // we want to insert a custom log prefix whenever a CR or LF is written into either buffer + // like in subprocess's check_output, we use pipe readers to read from the subprocess's stdout/stderr pipes + // however, we just dump everything we receive directly in the log, using our ĺogging framework + // we store an ldLog instance per stream so we can just send all data into those, which allows us to get away + // with relatively small buffers (we don't have to cache complete lines or alike) + // parameter order: pipe reader, log type (used in prefix), ldLog instance, first message + std::array, 2> pipes_to_be_logged{ + std::make_tuple(pipe_reader(proc.stdout_fd()), "stdout", ldLog{}, true), + std::make_tuple(pipe_reader(proc.stderr_fd()), "stderr", ldLog{}, true), + }; + + for (;;) { + for (auto& tuple : pipes_to_be_logged) { + // make code in this loop more readable + auto& reader = std::get<0>(tuple); + const auto& stream_name = std::get<1>(tuple); + auto& log = std::get<2>(tuple); + auto& is_first_message = std::get<3>(tuple); + + const auto log_prefix = "[" + name_ + "/" + stream_name + "] "; + + // since we have our own ldLog instance for every pipe, we can get away with this rather small read buffer + subprocess::subprocess_result_buffer_t intermediate_buffer(4096); + + // (try to) read from pipe + const auto bytes_read = reader.read(intermediate_buffer); + + // no action required in case we have not read anything from the pipe + if (bytes_read > 0) { + // special handling for the first message + if (is_first_message) { + log << log_prefix; + is_first_message = false; + } + + // we just trim the buffer to the bytes we read (makes the code below easier) + intermediate_buffer.resize(bytes_read); + + // all we have to do now is to look for CR or LF, send everything up to that location into the ldLog instance, + // write our prefix and then repeat + for (auto it = intermediate_buffer.begin(); it != intermediate_buffer.end(); ++it) { + const auto next_lf = std::find(it, intermediate_buffer.end(), '\n'); + const auto next_cr = std::find(it, intermediate_buffer.end(), '\r'); + + // we don't care which one goes first -- we pick the closest one, write everything up to it into our ldLog, + // then print our prefix and repeat that until there's nothing left in our buffer + auto next_control_char = std::min({next_lf, next_cr}); + + if (next_control_char == intermediate_buffer.end()) { + break; + } + + // need to make sure we include the control char in the write + log.write( + intermediate_buffer.data() + std::distance(intermediate_buffer.begin(), it), + std::distance(it, next_control_char) + 1 + ); + + log << log_prefix; + + it = next_control_char; + } + } + } + + // do-while might be a little more elegant, but we can save this one unnecessary sleep, so... + if (proc.is_running()) { + // reduce load on CPU + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } else { + break; + } + } + + return proc.close(); + } + + } +}