fprime/Svc/BufferAccumulator/BufferAccumulator.cpp
Thomas Boyer-Chammard c69ff72110
Format Svc and add to CI (#3978)
* Format Svc and add to CI

* Fix comlogger include

* fix assert UTs

* Fix static analysis warning

* formatting
2025-08-04 16:21:47 -07:00

225 lines
8.5 KiB
C++

// ======================================================================
// \title BufferAccumulator.cpp
// \author bocchino
// \brief BufferAccumulator implementation
//
// \copyright
// Copyright (C) 2017 California Institute of Technology.
// ALL RIGHTS RESERVED. United States Government Sponsorship
// acknowledged.
//
// ======================================================================
#include "Svc/BufferAccumulator/BufferAccumulator.hpp"
#include <sys/time.h>
#include <limits>
#include "Fw/Types/BasicTypes.hpp"
namespace Svc {
// ----------------------------------------------------------------------
// Construction, initialization, and destruction
// ----------------------------------------------------------------------
BufferAccumulator ::BufferAccumulator(const char* const compName)
: BufferAccumulatorComponentBase(compName), //!< The component name
m_mode(BufferAccumulator_OpState::ACCUMULATE),
m_bufferMemory(nullptr),
m_bufferQueue(),
m_send(false),
m_waitForBuffer(false),
m_numWarnings(0u),
m_numDrained(0u),
m_numToDrain(0u),
m_opCode(),
m_cmdSeq(0u),
m_allocatorId(0) {}
BufferAccumulator ::~BufferAccumulator() {}
// ----------------------------------------------------------------------
// Public methods
// ----------------------------------------------------------------------
void BufferAccumulator ::allocateQueue(FwEnumStoreType identifier,
Fw::MemAllocator& allocator,
FwSizeType maxNumBuffers //!< The maximum number of buffers
) {
this->m_allocatorId = identifier;
// Overflow protection
FW_ASSERT((std::numeric_limits<FwSizeType>::max() / maxNumBuffers) >= sizeof(Fw::Buffer));
FwSizeType memSize = static_cast<FwSizeType>(sizeof(Fw::Buffer) * maxNumBuffers);
bool recoverable = false;
this->m_bufferMemory = static_cast<Fw::Buffer*>(allocator.allocate(identifier, memSize, recoverable));
// TODO: Fail gracefully here
m_bufferQueue.init(this->m_bufferMemory, maxNumBuffers);
}
void BufferAccumulator ::deallocateQueue(Fw::MemAllocator& allocator) {
allocator.deallocate(static_cast<FwEnumStoreType>(this->m_allocatorId), this->m_bufferMemory);
}
// ----------------------------------------------------------------------
// Handler implementations for user-defined typed input ports
// ----------------------------------------------------------------------
void BufferAccumulator ::bufferSendInFill_handler(const FwIndexType portNum, Fw::Buffer& buffer) {
const bool status = this->m_bufferQueue.enqueue(buffer);
if (status) {
if (this->m_numWarnings > 0) {
this->log_ACTIVITY_HI_BA_BufferAccepted();
}
this->m_numWarnings = 0;
} else {
if (this->m_numWarnings == 0) {
this->log_WARNING_HI_BA_QueueFull();
}
m_numWarnings++;
}
if (this->m_send) {
this->sendStoredBuffer();
}
this->tlmWrite_BA_NumQueuedBuffers(static_cast<U32>(this->m_bufferQueue.getSize()));
}
void BufferAccumulator ::bufferSendInReturn_handler(const FwIndexType portNum, Fw::Buffer& buffer) {
this->bufferSendOutReturn_out(0, buffer);
this->m_waitForBuffer = false;
if ((this->m_mode == BufferAccumulator_OpState::DRAIN) || // we are draining ALL buffers
(this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers
// in a partial drain
this->m_send = true;
this->sendStoredBuffer();
}
}
void BufferAccumulator ::pingIn_handler(const FwIndexType portNum, U32 key) {
this->pingOut_out(0, key);
}
// ----------------------------------------------------------------------
// Command handler implementations
// ----------------------------------------------------------------------
void BufferAccumulator ::BA_SetMode_cmdHandler(const FwOpcodeType opCode,
const U32 cmdSeq,
BufferAccumulator_OpState mode) {
// cancel an in-progress partial drain
if (this->m_numToDrain > 0) {
// reset counters for partial buffer drain
this->m_numToDrain = 0;
this->m_numDrained = 0;
// respond to the original command
this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
}
this->m_mode = mode;
if (mode == BufferAccumulator_OpState::DRAIN) {
if (!this->m_waitForBuffer) {
this->m_send = true;
this->sendStoredBuffer();
}
} else {
this->m_send = false;
}
this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
}
void BufferAccumulator ::BA_DrainBuffers_cmdHandler(const FwOpcodeType opCode,
const U32 cmdSeq,
U32 numToDrain,
BufferAccumulator_BlockMode blockMode) {
if (this->m_numDrained < this->m_numToDrain) {
this->log_WARNING_HI_BA_StillDraining(static_cast<U32>(this->m_numDrained),
static_cast<U32>(this->m_numToDrain));
this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::BUSY);
return;
}
if (this->m_mode == BufferAccumulator_OpState::DRAIN) {
this->log_WARNING_HI_BA_AlreadyDraining();
this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::VALIDATION_ERROR);
return;
}
if (numToDrain == 0) {
this->log_ACTIVITY_HI_BA_PartialDrainDone(0);
this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
return;
}
this->m_opCode = opCode;
this->m_cmdSeq = cmdSeq;
this->m_numDrained = 0;
this->m_numToDrain = static_cast<FwSizeType>(numToDrain);
if (blockMode == BufferAccumulator_BlockMode::NOBLOCK) {
FwSizeType numBuffers = this->m_bufferQueue.getSize();
if (numBuffers < static_cast<FwSizeType>(numToDrain)) {
this->m_numToDrain = numBuffers;
this->log_WARNING_LO_BA_NonBlockDrain(static_cast<U32>(this->m_numToDrain), numToDrain);
}
/* OK if there were 0 buffers queued, and we
* end up setting numToDrain to 0
*/
if (0 == this->m_numToDrain) {
this->log_ACTIVITY_HI_BA_PartialDrainDone(0);
this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
return;
}
}
// We are still waiting for a buffer from last time
if (!this->m_waitForBuffer) {
this->m_send = true;
this->sendStoredBuffer(); // kick off the draining;
}
}
// ----------------------------------------------------------------------
// Private helper methods
// ----------------------------------------------------------------------
void BufferAccumulator ::sendStoredBuffer() {
FW_ASSERT(this->m_send);
Fw::Buffer buffer;
if ((this->m_numToDrain == 0) || // we are draining ALL buffers
(this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers in a
// partial drain
const bool status = this->m_bufferQueue.dequeue(buffer);
if (status) { // a buffer was dequeued
this->m_numDrained++;
this->bufferSendOutDrain_out(0, buffer);
this->m_waitForBuffer = true;
this->m_send = false;
} else if (this->m_numToDrain > 0) {
this->log_WARNING_HI_BA_DrainStalled(static_cast<U32>(this->m_numDrained),
static_cast<U32>(this->m_numToDrain));
}
}
/* This used to be "else if", but then you wait for all
* drained buffers in a partial drain to be RETURNED before returning OK.
* Correct thing is to return OK once they are SENT
*/
if ((this->m_numToDrain > 0) && // we are doing a partial drain
(this->m_numDrained == this->m_numToDrain)) { // AND we just finished draining
//
this->log_ACTIVITY_HI_BA_PartialDrainDone(static_cast<U32>(this->m_numDrained));
// reset counters for partial buffer drain
this->m_numToDrain = 0;
this->m_numDrained = 0;
this->m_send = false;
this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
}
this->tlmWrite_BA_NumQueuedBuffers(static_cast<U32>(this->m_bufferQueue.getSize()));
}
} // namespace Svc