Socket automatic reconnections in dedicated thread (#4136)

* Initial addition of dedicated reconnect task in SocketComponentHelper

* Updates to fix UT errors, address FIXMEs, update comments

* Additional FIXMEs and doc updates

* Spelling fix

* Format

* Format and some delay updates

* Address second request FIXMEs (remove)

* Unconditional stop & join in UDP tester cleanup

* Address FIXMEs and update Task to init singleton in a thread and address sanitizer safe manner

* Spelling

* Address PR review comments
This commit is contained in:
Mishaal 2025-09-22 10:09:35 -07:00 committed by GitHub
parent 40142d7d39
commit bae7d2573c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 256 additions and 39 deletions

View File

@ -24,7 +24,22 @@ SocketComponentHelper::~SocketComponentHelper() {}
void SocketComponentHelper::start(const Fw::StringBase& name,
const FwTaskPriorityType priority,
const Os::Task::ParamType stack,
const Os::Task::ParamType cpuAffinity) {
const Os::Task::ParamType cpuAffinity,
const FwTaskPriorityType priorityReconnect,
const Os::Task::ParamType stackReconnect,
const Os::Task::ParamType cpuAffinityReconnect) {
// Reconnect Thread
FW_ASSERT(m_reconnectTask.getState() ==
Os::Task::State::NOT_STARTED); // It is a coding error to start this task multiple times
this->m_reconnectStop = false;
Fw::String reconnectName;
reconnectName.format("%s_reconnect", name.toChar());
Os::Task::Arguments reconnectArguments(reconnectName, SocketComponentHelper::reconnectTask, this, priorityReconnect,
stackReconnect, cpuAffinityReconnect);
Os::Task::Status reconnectStat = m_reconnectTask.start(reconnectArguments);
FW_ASSERT(Os::Task::OP_OK == reconnectStat, static_cast<FwAssertArgType>(reconnectStat));
// Read Thread
FW_ASSERT(m_task.getState() ==
Os::Task::State::NOT_STARTED); // It is a coding error to start this task multiple times
this->m_stop = false;
@ -80,18 +95,19 @@ void SocketComponentHelper::setAutomaticOpen(bool auto_open) {
this->m_reopen = auto_open;
}
bool SocketComponentHelper::getAutomaticOpen() {
Os::ScopeLock scopedLock(this->m_lock);
return this->m_reopen;
}
SocketIpStatus SocketComponentHelper::reopen() {
SocketIpStatus status = SOCK_SUCCESS;
if (not this->isOpened()) {
// Check for auto-open before attempting to reopen
bool reopen = false;
{
Os::ScopeLock scopedLock(this->m_lock);
reopen = this->m_reopen;
}
// Open a network connection if it has not already been open
bool reopen = this->getAutomaticOpen();
if (not reopen) {
status = SOCK_AUTO_CONNECT_DISABLED;
// Open a network connection if it has not already been open
} else {
status = this->open();
if (status == SocketIpStatus::SOCK_ANOTHER_THREAD_OPENING) {
@ -109,15 +125,16 @@ SocketIpStatus SocketComponentHelper::send(const U8* const data, const FwSizeTyp
this->m_lock.unlock();
// Prevent transmission before connection, or after a disconnect
if (descriptor.fd == -1) {
status = this->reopen();
// if reopen wasn't successful, pass the that up to the caller
if (status != SOCK_SUCCESS) {
return status;
this->requestReconnect();
SocketIpStatus reconnectStat = this->waitForReconnect();
if (reconnectStat == SOCK_SUCCESS) {
// Refresh local copy after reopen
this->m_lock.lock();
descriptor = this->m_descriptor;
this->m_lock.unlock();
} else {
return reconnectStat;
}
// Refresh local copy after reopen
this->m_lock.lock();
descriptor = this->m_descriptor;
this->m_lock.unlock();
}
status = this->getSocketHandler().send(descriptor, data, size);
if (status == SOCK_DISCONNECTED) {
@ -138,8 +155,15 @@ void SocketComponentHelper::close() {
this->m_open = OpenState::NOT_OPEN;
}
/* Read Thread */
Os::Task::Status SocketComponentHelper::join() {
return m_task.join();
Os::Task::Status stat = m_task.join();
Os::Task::Status reconnectStat = this->joinReconnect();
if (stat == Os::Task::Status::OP_OK) {
return reconnectStat;
}
return stat;
}
void SocketComponentHelper::stop() {
@ -148,6 +172,7 @@ void SocketComponentHelper::stop() {
Os::ScopeLock scopeLock(m_lock);
this->m_stop = true;
}
this->stopReconnect();
this->shutdown(); // Break out of any receives and fully shutdown
}
@ -178,17 +203,12 @@ void SocketComponentHelper::readLoop() {
do {
// Prevent transmission before connection, or after a disconnect
if ((not this->isOpened()) and this->running()) {
status = this->reopen();
this->requestReconnect();
status = this->waitForReconnect();
// When reopen is disabled, just break as this is a exit condition for the loop
if (status == SOCK_AUTO_CONNECT_DISABLED) {
break;
}
// If the reconnection failed in any other way, warn, wait, and retry
else if (status != SOCK_SUCCESS) {
Fw::Logger::log("[WARNING] Failed to open port with status %d and errno %d\n", status, errno);
(void)Os::Task::delay(SOCKET_RETRY_INTERVAL);
continue;
}
}
// If the network connection is open, read from it
if (this->isOpened() and this->running()) {
@ -221,4 +241,123 @@ void SocketComponentHelper::readTask(void* pointer) {
SocketComponentHelper* self = reinterpret_cast<SocketComponentHelper*>(pointer);
self->readLoop();
}
/* Reconnect Thread */
Os::Task::Status SocketComponentHelper::joinReconnect() {
return m_reconnectTask.join();
}
void SocketComponentHelper::stopReconnect() {
Os::ScopeLock scopeLock(this->m_reconnectLock);
this->m_reconnectState = ReconnectState::NOT_RECONNECTING;
this->m_reconnectStop = true;
}
bool SocketComponentHelper::runningReconnect() {
Os::ScopeLock scopedLock(this->m_reconnectLock);
bool running = not this->m_reconnectStop;
return running;
}
void SocketComponentHelper::reconnectLoop() {
SocketIpStatus status = SOCK_SUCCESS;
while (this->runningReconnect()) {
// Check if we need to reconnect
bool reconnect = false;
{
Os::ScopeLock scopedLock(this->m_reconnectLock);
if (this->m_reconnectState == ReconnectState::REQUEST_RECONNECT) {
this->m_reconnectState = ReconnectState::RECONNECT_IN_PROGRESS;
reconnect = true;
}
// If we were already in or are now in RECONNECT_IN_PROGRESS we
// need to try to reconnect, again
else if (this->m_reconnectState == ReconnectState::RECONNECT_IN_PROGRESS) {
reconnect = true;
}
}
if (reconnect) {
status = this->reopen();
// Reopen Case 1: Auto Connect is disabled, so no longer
// try to reconnect
if (status == SOCK_AUTO_CONNECT_DISABLED) {
Os::ScopeLock scopedLock(this->m_reconnectLock);
this->m_reconnectState = ReconnectState::NOT_RECONNECTING;
}
// Reopen Case 2: Success, so no longer
// try to reconnect
else if (status == SOCK_SUCCESS) {
Os::ScopeLock scopedLock(this->m_reconnectLock);
this->m_reconnectState = ReconnectState::NOT_RECONNECTING;
}
// Reopen Case 3: Keep trying to reconnect - NO reconnect
// state change
else {
Fw::Logger::log("[WARNING] Failed to open port with status %d and errno %d\n", status, errno);
(void)Os::Task::delay(SOCKET_RETRY_INTERVAL);
}
} else {
// After a brief delay, we will loop again
(void)Os::Task::delay(this->m_reconnectCheckInterval);
}
}
}
void SocketComponentHelper::reconnectTask(void* pointer) {
FW_ASSERT(pointer);
SocketComponentHelper* self = reinterpret_cast<SocketComponentHelper*>(pointer);
self->reconnectLoop();
}
void SocketComponentHelper::requestReconnect() {
Os::ScopeLock scopedLock(this->m_reconnectLock);
if (m_reconnectState == ReconnectState::NOT_RECONNECTING) {
m_reconnectState = ReconnectState::REQUEST_RECONNECT;
}
return;
}
SocketIpStatus SocketComponentHelper::waitForReconnect(Fw::TimeInterval timeout) {
// Do not attempt to reconnect if auto reconnect config flag is disabled
if (!this->getAutomaticOpen()) {
return SOCK_AUTO_CONNECT_DISABLED;
}
Fw::TimeInterval elapsed = Fw::TimeInterval(0, 0);
while (elapsed < timeout) {
// If the reconnect thread is NOT reconnecting, we are done waiting
// If we are no longer running the reconnect thread, we are done waiting
{
Os::ScopeLock scopedLock(this->m_reconnectLock);
if (this->m_reconnectState == ReconnectState::NOT_RECONNECTING) {
break;
}
if (this->m_reconnectStop) {
break;
}
}
// Wait a bit before checking again
(void)Os::Task::delay(this->m_reconnectWaitInterval);
elapsed.add(this->m_reconnectWaitInterval.getSeconds(), this->m_reconnectWaitInterval.getUSeconds());
}
// If we have completed our loop, check if we are connected or if
// auto connect was disabled during our wait
if (this->isOpened()) {
return SOCK_SUCCESS;
}
// Check one more time if auto reconnect config flag got disabled
if (!this->getAutomaticOpen()) {
return SOCK_AUTO_CONNECT_DISABLED;
}
return SOCK_DISCONNECTED; // Indicates failure of this attempt, another reopen needed
}
} // namespace Drv

View File

@ -14,6 +14,7 @@
#include <Drv/Ip/IpSocket.hpp>
#include <Fw/Buffer/Buffer.hpp>
#include <Os/Condition.hpp>
#include <Os/Mutex.hpp>
#include <Os/Task.hpp>
@ -28,6 +29,7 @@ namespace Drv {
class SocketComponentHelper {
public:
enum OpenState { NOT_OPEN, OPENING, OPEN, SKIP };
enum ReconnectState { NOT_RECONNECTING, REQUEST_RECONNECT, RECONNECT_IN_PROGRESS };
/**
* \brief constructs the socket read task
*/
@ -47,15 +49,25 @@ class SocketComponentHelper {
* default behavior is to automatically open connections.
*
* \param name: name of the task
* \param priority: priority of the started task. See: Os::Task::start. Default: TASK_PRIORITY_DEFAULT, not
* \param priority: priority of the started read task. See: Os::Task::start. Default: TASK_PRIORITY_DEFAULT, not
* prioritized
* \param stack: stack size provided to the task. See: Os::Task::start. Default: TASK_DEFAULT, posix threads default
* \param cpuAffinity: cpu affinity provided to task. See: Os::Task::start. Default: TASK_DEFAULT, don't care
* \param stack: stack size provided to the read task. See: Os::Task::start. Default: TASK_DEFAULT, posix threads
* default
* \param cpuAffinity: cpu affinity provided to read task. See: Os::Task::start. Default: TASK_DEFAULT, don't care
* \param priorityReconnect: priority of the started reconnect task. See: Os::Task::start. Default:
* TASK_PRIORITY_DEFAULT, not prioritized
* \param stackReconnect: stack size provided to the reconnect task. See: Os::Task::start. Default: TASK_DEFAULT,
* posix threads default
* \param cpuAffinityReconnect: cpu affinity provided to reconnect task. See: Os::Task::start. Default:
* TASK_DEFAULT, don't care
*/
void start(const Fw::StringBase& name,
const FwTaskPriorityType priority = Os::Task::TASK_PRIORITY_DEFAULT,
const Os::Task::ParamType stack = Os::Task::TASK_DEFAULT,
const Os::Task::ParamType cpuAffinity = Os::Task::TASK_DEFAULT);
const Os::Task::ParamType cpuAffinity = Os::Task::TASK_DEFAULT,
const FwTaskPriorityType priorityReconnect = Os::Task::TASK_PRIORITY_DEFAULT,
const Os::Task::ParamType stackReconnect = Os::Task::TASK_DEFAULT,
const Os::Task::ParamType cpuAffinityReconnect = Os::Task::TASK_DEFAULT);
/**
* \brief open the socket for communications
@ -91,6 +103,13 @@ class SocketComponentHelper {
*/
void setAutomaticOpen(bool auto_open);
/**
* \brief get socket automatically open connections status
*
* \return status of auto_open
*/
bool getAutomaticOpen();
/**
* \brief send data to the IP socket from the given buffer
*
@ -134,6 +153,7 @@ class SocketComponentHelper {
* \brief is the read loop running
*/
bool running();
bool runningReconnect();
/**
* \brief stop the socket read task and close the associated socket.
@ -143,6 +163,8 @@ class SocketComponentHelper {
*/
void stop();
void stopReconnect();
/**
* \brief joins to the stopping read task to wait for it to close
*
@ -153,11 +175,19 @@ class SocketComponentHelper {
*/
Os::Task::Status join();
Os::Task::Status joinReconnect();
protected:
/**
* \brief receive off the TCP socket
*/
virtual void readLoop();
/**
* \brief reconnect TCP socket
*/
virtual void reconnectLoop();
/**
* \brief returns a reference to the socket handler
*
@ -206,6 +236,33 @@ class SocketComponentHelper {
*/
static void readTask(void* pointer);
/**
* \brief a task designed for socket reconnection
*
* \param pointer: pointer to "this" component
*/
static void reconnectTask(void* pointer);
/**
* \brief signal to reconnect task that a reconnect is needed
*
*/
void requestReconnect();
/**
* \brief wait method for a task to wait for a reconnect request to complete
*
* After requesting a reconnect, tasks should call this method
* to wait for the reconnect thread to complete
*
*
* \param timeout: timeout so that the wait doesn't hang indefinitely
*
* \return status of the reconnect request, SOCK_DISCONNECTED for
* reopen again, or SOCK_SUCCESS on success, something else on error
*/
SocketIpStatus waitForReconnect(Fw::TimeInterval timeout = Fw::TimeInterval(1, 0));
private:
/**
* \brief Re-open port if it has been disconnected
@ -219,12 +276,24 @@ class SocketComponentHelper {
SocketIpStatus reopen();
protected:
bool m_reopen = true; //!< Force reopen on disconnect
SocketDescriptor m_descriptor;
// Read/recv
Os::Task m_task;
Os::Mutex m_lock;
SocketDescriptor m_descriptor;
bool m_reopen = true; //!< Force reopen on disconnect
bool m_stop = true; //!< Stops the task when set to true
OpenState m_open = OpenState::NOT_OPEN; //!< Have we successfully opened
// Reconnect
Os::Task m_reconnectTask;
Os::Mutex m_reconnectLock;
bool m_reconnectStop = true;
ReconnectState m_reconnectState = ReconnectState::NOT_RECONNECTING;
Fw::TimeInterval m_reconnectCheckInterval =
Fw::TimeInterval(0, 50000); // 50 ms, Interval at which reconnect task loop checks for requests
Fw::TimeInterval m_reconnectWaitInterval =
Fw::TimeInterval(0, 10000); // 10 ms, Interval at which reconnect requesters wait for response
};
} // namespace Drv
#endif // DRV_SocketComponentHelper_HPP

View File

@ -154,11 +154,11 @@ socketBoth.configureRecv(127.0.0.1, 60212);
```
### Support for Ephemeral Ports
Drv::UdpSocket supports ephemeral ports through passing a 0 as the port argument for either `Drv::UdpSocket::configureSend`
Drv::UdpSocket supports ephemeral ports through passing a 0 as the port argument for either `Drv::UdpSocket::configureSend`
or `Drv::UdpSocket::configureRecv`.
For `Drv::UdpSocket::configureSend` this means that you would like to set up the UdpSocket to be able to respond to the source
port that is indicated in the UDP datagrams you receive. Note that this configuration will set up the send port to the port
For `Drv::UdpSocket::configureSend` this means that you would like to set up the UdpSocket to be able to respond to the source
port that is indicated in the UDP datagrams you receive. Note that this configuration will set up the send port to the port
specified only in the first message received.
For `Drv::UdpSocket::configureRecv` this means that you would like to be assigned an ephemeral port. This would generally be used
@ -168,16 +168,17 @@ for setting up a sender that would like to receive responses to messages on an e
The Drv::SocketComponentHelper is intended as a base class used to add in the functionality of an automatically reconnecting
receive thread to another class (typically an F´ component) as well as an interface between the component using an IP socket
and the IP socket library functions implemented in this folder. In order for this thread to function, the inheritor must
and the IP socket library functions implemented in this folder. In order for the receive thread to function, the inheritor must
implement several methods to provide the necessary interaction of this thread. These functions are described in the next
section.
section. The automatic reconnection is handled by a separate dedicated thread upon request from either the receive thread
or a send request.
In order to start the receiving thread a call to the `Drv::SocketComponentHelper::start` method is performed passing
in a name, and all arguments to `Os::Task::start` to start the task. An optional parameter, reconnect, will determine if
this read task will reconnect to sockets should a disconnect or error occur. Once started the read task will continue
in a name, and all arguments to `Os::Task::start` to start the receive and reconnect tasks. An optional parameter, reconnect,
will determine if this read task will request reconnects to sockets should a disconnect or error occur. Once started, the read task will continue
until a `Drv::SocketComponentHelper::stop` has been called or an error occurred when started without reconnect set to
`true`. Once the socket stop call has been made, the user should call `Drv::SocketComponentHelper::join` in order to
wait until the full task has finished. `Drv::SocketComponentHelper::stop` will call `Drv::SocketComponentHelper::close` on the
wait until the full tasks have finished. `Drv::SocketComponentHelper::stop` will call `Drv::SocketComponentHelper::close` on the
provided Drv::IpSocket to ensure that any blocking reads exit freeing the thread to completely stop. Normal usage of
a Drv::SocketComponentHelper derived class is shown below.
@ -187,7 +188,7 @@ uplinkComm.start(name); // Default reconnect=true
...
uplinkComm.stop();
(void) uplinkComm.join();
(void) uplinkComm.join(); // this will join the receive and reconnect tasks
```
`Drv::SocketComponentHelper::open` and `Drv::SocketComponentHelper::close` convenience methods are also provided to open and close the

View File

@ -151,7 +151,10 @@ TcpServerTester ::TcpServerTester()
::memset(m_data_storage, 0, sizeof(m_data_storage));
}
TcpServerTester ::~TcpServerTester() {}
TcpServerTester ::~TcpServerTester() {
this->component.stop();
this->component.join();
}
// ----------------------------------------------------------------------
// Tests

View File

@ -94,6 +94,11 @@ Task::Status Task::start(const Fw::StringBase& name,
}
Task::Status Task::start(const Task::Arguments& arguments) {
Task::init();
// init call above is to ensure singleton is initialized in a thread-safe
// manner and such that the address sanitizer does not inadvertently
// result in a stack overflow when multiple calls to getSingleton are made
// simultaneously from different threads. (As was observed in UT runs.)
FW_ASSERT(&this->m_delegate == reinterpret_cast<TaskInterface*>(&this->m_handle_storage[0]));
FW_ASSERT(arguments.m_routine != nullptr);
this->m_name = arguments.m_name;