Switch to new subprocess lib in type 0 plugins, mk. 2

Apparently fixes all buffer-related crashes, but there's a few minor bugs left.

CC #143
This commit is contained in:
TheAssassin
2020-08-31 08:39:43 +02:00
parent 68f4655fc4
commit e9bbbc80f4
4 changed files with 151 additions and 135 deletions

View File

@@ -14,6 +14,7 @@
#include "linuxdeploy/core/log.h"
#include "linuxdeploy/util/util.h"
#include "linuxdeploy/subprocess/process.h"
#include "linuxdeploy/plugin/plugin_process_handler.h"
#pragma once
@@ -135,139 +136,8 @@ namespace linuxdeploy {
template<int API_LEVEL>
int PluginBase<API_LEVEL>::run(const boost::filesystem::path& appDirPath) {
const auto pluginPath = path();
const std::initializer_list<std::string> args = {pluginPath.string(), "--appdir", appDirPath.string()};
auto log = ldLog();
log << "Running process:";
for (const auto& arg : args) {
log << "" << arg;
}
log << std::endl;
subprocess::subprocess_env_map_t environmentVariables{};
if (this->pluginType() == PLUGIN_TYPE::INPUT_TYPE) {
// add $LINUXDEPLOY, which points to the current binary
// we do not need to pass $APPIMAGE or alike, since while linuxdeploy is running, the path in the
// temporary mountpoint of its AppImage will be valid anyway
environmentVariables["LINUXDEPLOY"] = linuxdeploy::util::getOwnExecutablePath();
}
subprocess::process process(args, environmentVariables);
std::vector<pollfd> pfds(2);
auto* opfd = &pfds[0];
auto* epfd = &pfds[1];
opfd->fd = process.stdout_fd();
opfd->events = POLLIN;
epfd->fd = process.stderr_fd();
epfd->events = POLLIN;
for (const auto& fd : {process.stdout_fd(), process.stderr_fd()}) {
auto flags = fcntl(fd, F_GETFL, 0);
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
auto printOutput = [&pfds, opfd, epfd, this, &process]() {
poll(pfds.data(), pfds.size(), -1);
auto printUntilLastLine = [this](std::vector<char>& buf, size_t& bufSize, const std::string& streamType) {
std::ostringstream oss;
while (true) {
const auto firstLineFeed = std::find(buf.begin(), buf.end(), '\n');
const auto firstCarriageReturn = std::find(buf.begin(), buf.end(), '\r');
if (firstLineFeed == buf.end() && firstCarriageReturn == buf.end())
break;
const auto endOfLine = std::min(firstLineFeed, firstCarriageReturn);
std::string line(buf.begin(), endOfLine+1);
oss << "[" << d->name << "/" << streamType << "] " << line;
bufSize -= std::distance(buf.begin(), endOfLine+1);
buf.erase(buf.begin(), endOfLine+1);
}
auto messages = oss.str();
if (!messages.empty())
ldLog() << messages;
};
std::vector<char> stdoutBuf(16 * 1024);
std::vector<char> stderrBuf(16 * 1024);
size_t currentStdoutBufSize = 0;
size_t currentStderrBufSize = 0;
if ((opfd->revents & POLLIN) != 0) {
if (currentStdoutBufSize >= stdoutBuf.size())
throw std::runtime_error("Buffer overflow");
while (true) {
auto bytesRead = read(
process.stdout_fd(),
reinterpret_cast<void*>(stdoutBuf.data() + currentStdoutBufSize),
static_cast<size_t>(stdoutBuf.size() - currentStdoutBufSize)
);
if (bytesRead == 0)
break;
currentStdoutBufSize += bytesRead;
printUntilLastLine(stdoutBuf, currentStdoutBufSize, "stdout");
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if ((epfd->revents & POLLIN) != 0) {
if (currentStderrBufSize >= stderrBuf.size())
throw std::runtime_error("Buffer overflow");
while (true) {
auto bytesRead = read(
process.stderr_fd(),
reinterpret_cast<void*>(stderrBuf.data() + currentStderrBufSize),
static_cast<size_t>(stderrBuf.size() - currentStderrBufSize)
);
if (bytesRead == 0)
break;
currentStderrBufSize += bytesRead;
printUntilLastLine(stderrBuf, currentStderrBufSize, "stderr");
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return true;
};
do {
if (!printOutput()) {
ldLog() << LD_ERROR << "Failed to communicate with process" << std::endl;
process.kill();
return -1;
}
} while (process.is_running());
if (!printOutput()) {
ldLog() << LD_ERROR << "Failed to communicate with process" << std::endl;
process.kill();
return -1;
}
const auto retcode = process.close();
return retcode;
plugin_process_handler handler(d->name, path());
return handler.run(appDirPath);
}
}
}

View File

@@ -0,0 +1,19 @@
#pragma once
// library headers
#include <boost/filesystem/path.hpp>
namespace linuxdeploy {
namespace plugin {
class plugin_process_handler {
private:
std::string name_;
boost::filesystem::path path_;
public:
plugin_process_handler(std::string name, boost::filesystem::path path);
int run(const boost::filesystem::path& appDir) const;
};
}
}

View File

@@ -1,4 +1,20 @@
file(GLOB PLUGIN_HEADERS ${PROJECT_SOURCE_DIR}/include/linuxdeploy/plugin/*.h)
set(headers_dir ${PROJECT_SOURCE_DIR}/include/linuxdeploy/plugin/)
add_library(linuxdeploy_plugin STATIC plugin_type0.cpp plugin.cpp ${PLUGIN_HEADERS})
set(headers
${headers_dir}/plugin.h
${headers_dir}/base.h
${headers_dir}/base_impl.h
${headers_dir}/exceptions.h
${headers_dir}/plugin_process_handler.h
)
add_library(linuxdeploy_plugin STATIC
plugin.cpp
plugin_type0.cpp
plugin_process_handler.cpp
${headers}
)
target_link_libraries(linuxdeploy_plugin PUBLIC linuxdeploy_core ${BOOST_LIBS} linuxdeploy_subprocess)
unset(headers)
unset(headers_dir)

View File

@@ -0,0 +1,111 @@
// system headers
#include <tuple>
#include <thread>
// local headers
#include <linuxdeploy/plugin/plugin_process_handler.h>
#include <linuxdeploy/subprocess/process.h>
#include <linuxdeploy/util/util.h>
#include <linuxdeploy/core/log.h>
#include <linuxdeploy/subprocess/pipe_reader.h>
namespace bf = boost::filesystem;
namespace linuxdeploy {
namespace plugin {
using namespace core::log;
plugin_process_handler::plugin_process_handler(std::string name, bf::path path) : name_(std::move(name)), path_(std::move(path)) {}
int plugin_process_handler::run(const bf::path& appDir) const {
// prepare arguments and environment variables
const std::initializer_list<std::string> args = {path_.string(), "--appdir", appDir.string()};
subprocess::subprocess_env_map_t environmentVariables{};
// add $LINUXDEPLOY, which points to the current binary
// we do not need to pass $APPIMAGE or alike, since while linuxdeploy is running, the path in the
// temporary mountpoint of its AppImage will be valid anyway
environmentVariables["LINUXDEPLOY"] = linuxdeploy::util::getOwnExecutablePath();
linuxdeploy::subprocess::process proc{args, environmentVariables};
// we want to insert a custom log prefix whenever a CR or LF is written into either buffer
// like in subprocess's check_output, we use pipe readers to read from the subprocess's stdout/stderr pipes
// however, we just dump everything we receive directly in the log, using our ĺogging framework
// we store an ldLog instance per stream so we can just send all data into those, which allows us to get away
// with relatively small buffers (we don't have to cache complete lines or alike)
// parameter order: pipe reader, log type (used in prefix), ldLog instance, first message
std::array<std::tuple<pipe_reader, std::string, ldLog, bool>, 2> pipes_to_be_logged{
std::make_tuple(pipe_reader(proc.stdout_fd()), "stdout", ldLog{}, true),
std::make_tuple(pipe_reader(proc.stderr_fd()), "stderr", ldLog{}, true),
};
for (;;) {
for (auto& tuple : pipes_to_be_logged) {
// make code in this loop more readable
auto& reader = std::get<0>(tuple);
const auto& stream_name = std::get<1>(tuple);
auto& log = std::get<2>(tuple);
auto& is_first_message = std::get<3>(tuple);
const auto log_prefix = "[" + name_ + "/" + stream_name + "] ";
// since we have our own ldLog instance for every pipe, we can get away with this rather small read buffer
subprocess::subprocess_result_buffer_t intermediate_buffer(4096);
// (try to) read from pipe
const auto bytes_read = reader.read(intermediate_buffer);
// no action required in case we have not read anything from the pipe
if (bytes_read > 0) {
// special handling for the first message
if (is_first_message) {
log << log_prefix;
is_first_message = false;
}
// we just trim the buffer to the bytes we read (makes the code below easier)
intermediate_buffer.resize(bytes_read);
// all we have to do now is to look for CR or LF, send everything up to that location into the ldLog instance,
// write our prefix and then repeat
for (auto it = intermediate_buffer.begin(); it != intermediate_buffer.end(); ++it) {
const auto next_lf = std::find(it, intermediate_buffer.end(), '\n');
const auto next_cr = std::find(it, intermediate_buffer.end(), '\r');
// we don't care which one goes first -- we pick the closest one, write everything up to it into our ldLog,
// then print our prefix and repeat that until there's nothing left in our buffer
auto next_control_char = std::min({next_lf, next_cr});
if (next_control_char == intermediate_buffer.end()) {
break;
}
// need to make sure we include the control char in the write
log.write(
intermediate_buffer.data() + std::distance(intermediate_buffer.begin(), it),
std::distance(it, next_control_char) + 1
);
log << log_prefix;
it = next_control_char;
}
}
}
// do-while might be a little more elegant, but we can save this one unnecessary sleep, so...
if (proc.is_running()) {
// reduce load on CPU
std::this_thread::sleep_for(std::chrono::milliseconds(50));
} else {
break;
}
}
return proc.close();
}
}
}