Buffer subprocess communication

This commit is contained in:
TheAssassin 2018-10-07 00:05:59 +02:00
parent cdee469e60
commit 485507a2d1

View File

@ -158,50 +158,94 @@ namespace linuxdeploy {
auto printOutput = [&pfds, opfd, epfd, this, &process]() {
poll(pfds.data(), pfds.size(), -1);
if (opfd->revents & POLLIN) {
std::vector<char> lineBuf(16384);
fgets(lineBuf.data(), static_cast<int>(lineBuf.size()), process.output());
std::stringstream ss;
ss << lineBuf.data();
lineBuf.clear();
std::string currentLine;
auto printUntilLastLine = [this](std::vector<char>& buf, size_t& bufSize, const std::string& streamType) {
std::ostringstream oss;
while (std::getline(ss, currentLine)) {
oss << "[" << d->name << "/stdout] " << currentLine << std::endl;
while (true) {
const auto firstLineFeed = std::find(buf.begin(), buf.end(), '\n');
const auto firstCarriageReturn = std::find(buf.begin(), buf.end(), '\r');
if (firstLineFeed == buf.end() && firstCarriageReturn == buf.end())
break;
const auto endOfLine = std::min(firstLineFeed, firstCarriageReturn);
std::string line(buf.begin(), endOfLine+1);
oss << "[" << d->name << "/" << streamType << "] " << line;
bufSize -= std::distance(buf.begin(), endOfLine+1);
buf.erase(buf.begin(), endOfLine+1);
}
auto messages = oss.str();
if (!messages.empty())
ldLog() << messages;
};
std::vector<char> stdoutBuf(16 * 1024);
std::vector<char> stderrBuf(16 * 1024);
size_t stdoutBufSize = 0;
size_t stderrBufSize = 0;
if (opfd->revents & POLLIN) {
if (stdoutBufSize >= stdoutBuf.size())
throw std::runtime_error("Buffer overflow");
while (true) {
auto bytesRead = fread(
reinterpret_cast<void*>(stdoutBuf.data() + stdoutBufSize),
sizeof(char),
static_cast<size_t>(stdoutBuf.size() - stdoutBufSize),
process.output()
);
if (bytesRead == 0)
break;
stdoutBufSize += bytesRead;
printUntilLastLine(stdoutBuf, stdoutBufSize, "stdout");
}
linuxdeploy::core::log::ldLog() << oss.str();
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (epfd->revents & POLLIN) {
std::vector<char> lineBuf(16384);
fgets(lineBuf.data(), static_cast<int>(lineBuf.size()), process.error());
if (stderrBufSize >= stderrBuf.size())
throw std::runtime_error("Buffer overflow");
std::stringstream ss;
ss << lineBuf.data();
lineBuf.clear();
while (true) {
auto bytesRead = fread(
reinterpret_cast<void*>(stderrBuf.data() + stderrBufSize),
sizeof(char),
static_cast<size_t>(stderrBuf.size() - stderrBufSize),
process.error()
);
std::string currentLine;
if (bytesRead == 0)
break;
std::ostringstream oss;
while (std::getline(ss, currentLine)) {
oss << "[" << d->name << "/stderr] " << currentLine << std::endl;
stderrBufSize += bytesRead;
printUntilLastLine(stderrBuf, stderrBufSize, "stderr");
}
linuxdeploy::core::log::ldLog() << oss.str();
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return true;
};
int retcode;
do {
if (!printOutput()) {
ldLog() << LD_ERROR << "Failed to communicate with process" << std::endl;
process.kill();
return -1;
}
} while (process.poll() < 0);
} while ((retcode = process.poll()) < 0);
if (!printOutput()) {
ldLog() << LD_ERROR << "Failed to communicate with process" << std::endl;
@ -209,7 +253,7 @@ namespace linuxdeploy {
return -1;
}
return process.retcode();
return retcode;
}
}
}