From 485507a2d19b8760355a03f7d2e4cf76d157799b Mon Sep 17 00:00:00 2001 From: TheAssassin Date: Sun, 7 Oct 2018 00:05:59 +0200 Subject: [PATCH] Buffer subprocess communication --- include/linuxdeploy/plugin/base_impl.h | 94 +++++++++++++++++++------- 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/include/linuxdeploy/plugin/base_impl.h b/include/linuxdeploy/plugin/base_impl.h index b4fc13d..15a10d0 100644 --- a/include/linuxdeploy/plugin/base_impl.h +++ b/include/linuxdeploy/plugin/base_impl.h @@ -158,50 +158,94 @@ namespace linuxdeploy { auto printOutput = [&pfds, opfd, epfd, this, &process]() { poll(pfds.data(), pfds.size(), -1); - if (opfd->revents & POLLIN) { - std::vector lineBuf(16384); - fgets(lineBuf.data(), static_cast(lineBuf.size()), process.output()); - - std::stringstream ss; - ss << lineBuf.data(); - lineBuf.clear(); - - std::string currentLine; - + auto printUntilLastLine = [this](std::vector& buf, size_t& bufSize, const std::string& streamType) { std::ostringstream oss; - while (std::getline(ss, currentLine)) { - oss << "[" << d->name << "/stdout] " << currentLine << std::endl; + + while (true) { + const auto firstLineFeed = std::find(buf.begin(), buf.end(), '\n'); + const auto firstCarriageReturn = std::find(buf.begin(), buf.end(), '\r'); + + if (firstLineFeed == buf.end() && firstCarriageReturn == buf.end()) + break; + + const auto endOfLine = std::min(firstLineFeed, firstCarriageReturn); + + std::string line(buf.begin(), endOfLine+1); + + oss << "[" << d->name << "/" << streamType << "] " << line; + + bufSize -= std::distance(buf.begin(), endOfLine+1); + buf.erase(buf.begin(), endOfLine+1); + } + + auto messages = oss.str(); + if (!messages.empty()) + ldLog() << messages; + }; + + std::vector stdoutBuf(16 * 1024); + std::vector stderrBuf(16 * 1024); + size_t stdoutBufSize = 0; + size_t stderrBufSize = 0; + + if (opfd->revents & POLLIN) { + if (stdoutBufSize >= stdoutBuf.size()) + throw std::runtime_error("Buffer overflow"); + + while (true) { + auto bytesRead = fread( + reinterpret_cast(stdoutBuf.data() + stdoutBufSize), + sizeof(char), + static_cast(stdoutBuf.size() - stdoutBufSize), + process.output() + ); + + if (bytesRead == 0) + break; + + stdoutBufSize += bytesRead; + + printUntilLastLine(stdoutBuf, stdoutBufSize, "stdout"); } - linuxdeploy::core::log::ldLog() << oss.str(); } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (epfd->revents & POLLIN) { - std::vector lineBuf(16384); - fgets(lineBuf.data(), static_cast(lineBuf.size()), process.error()); + if (stderrBufSize >= stderrBuf.size()) + throw std::runtime_error("Buffer overflow"); - std::stringstream ss; - ss << lineBuf.data(); - lineBuf.clear(); + while (true) { + auto bytesRead = fread( + reinterpret_cast(stderrBuf.data() + stderrBufSize), + sizeof(char), + static_cast(stderrBuf.size() - stderrBufSize), + process.error() + ); - std::string currentLine; + if (bytesRead == 0) + break; - std::ostringstream oss; - while (std::getline(ss, currentLine)) { - oss << "[" << d->name << "/stderr] " << currentLine << std::endl; + stderrBufSize += bytesRead; + + printUntilLastLine(stderrBuf, stderrBufSize, "stderr"); } - linuxdeploy::core::log::ldLog() << oss.str(); } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + return true; }; + int retcode; + do { if (!printOutput()) { ldLog() << LD_ERROR << "Failed to communicate with process" << std::endl; process.kill(); return -1; } - } while (process.poll() < 0); + } while ((retcode = process.poll()) < 0); if (!printOutput()) { ldLog() << LD_ERROR << "Failed to communicate with process" << std::endl; @@ -209,7 +253,7 @@ namespace linuxdeploy { return -1; } - return process.retcode(); + return retcode; } } }