Correcting shutdown/close usage; refactoring TcpServer (#2950)

* Correcting shutdown/close usage; refactoring TcpServer

* Fixing Tcp

* Review and CI fixes

* ...and the other half of the fixes
This commit is contained in:
M Starch 2024-10-16 09:36:34 -07:00 committed by GitHub
parent c7e68c52ed
commit 9ed68fdd06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 449 additions and 363 deletions

View File

@ -23,17 +23,17 @@
#ifdef TGT_OS_TYPE_VXWORKS
#include <socket.h>
#include <inetLib.h>
#include <fioLib.h>
#include <hostLib.h>
#include <ioLib.h>
#include <vxWorks.h>
#include <sockLib.h>
#include <fioLib.h>
#include <taskLib.h>
#include <sysLib.h>
#include <errnoLib.h>
#include <cstring>
#include <inetLib.h>
#include <fioLib.h>
#include <hostLib.h>
#include <ioLib.h>
#include <vxWorks.h>
#include <sockLib.h>
#include <fioLib.h>
#include <taskLib.h>
#include <sysLib.h>
#include <errnoLib.h>
#include <cstring>
#elif defined TGT_OS_TYPE_LINUX || TGT_OS_TYPE_DARWIN
#include <sys/socket.h>
#include <unistd.h>
@ -52,7 +52,7 @@ IpSocket::IpSocket() : m_timeoutSeconds(0), m_timeoutMicroseconds(0), m_port(0)
SocketIpStatus IpSocket::configure(const char* const hostname, const U16 port, const U32 timeout_seconds, const U32 timeout_microseconds) {
FW_ASSERT(timeout_microseconds < 1000000, static_cast<FwAssertArgType>(timeout_microseconds));
FW_ASSERT(this->isValidPort(port));
FW_ASSERT(this->isValidPort(port), static_cast<FwAssertArgType>(port));
FW_ASSERT(hostname != nullptr);
this->m_timeoutSeconds = timeout_seconds;
this->m_timeoutMicroseconds = timeout_microseconds;
@ -65,7 +65,7 @@ bool IpSocket::isValidPort(U16 port) {
return true;
}
SocketIpStatus IpSocket::setupTimeouts(NATIVE_INT_TYPE socketFd) {
SocketIpStatus IpSocket::setupTimeouts(PlatformIntType socketFd) {
// Get the IP address from host
#ifdef TGT_OS_TYPE_VXWORKS
// No timeouts set on Vxworks
@ -103,38 +103,39 @@ SocketIpStatus IpSocket::addressToIp4(const char* address, void* ip4) {
return SOCK_SUCCESS;
}
void IpSocket::close(NATIVE_INT_TYPE fd) {
(void)::shutdown(fd, SHUT_RDWR);
(void)::close(fd);
void IpSocket::close(const SocketDescriptor& socketDescriptor) {
(void)::close(socketDescriptor.fd);
}
void IpSocket::shutdown(NATIVE_INT_TYPE fd) {
this->close(fd);
void IpSocket::shutdown(const SocketDescriptor& socketDescriptor) {
errno = 0;
PlatformIntType status = ::shutdown(socketDescriptor.fd, SHUT_RDWR);
// If shutdown fails, go straight to the hard-shutdown
if (status != 0) {
this->close(socketDescriptor);
}
}
SocketIpStatus IpSocket::startup() {
// no op for non-server components
return SOCK_SUCCESS;
}
SocketIpStatus IpSocket::open(NATIVE_INT_TYPE& fd) {
SocketIpStatus IpSocket::open(SocketDescriptor& socketDescriptor) {
SocketIpStatus status = SOCK_SUCCESS;
errno = 0;
// Open a TCP socket for incoming commands, and outgoing data if not using UDP
status = this->openProtocol(fd);
status = this->openProtocol(socketDescriptor);
if (status != SOCK_SUCCESS) {
FW_ASSERT(fd == -1); // Ensure we properly kept closed on error
socketDescriptor.fd = -1;
return status;
}
return status;
}
SocketIpStatus IpSocket::send(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) {
SocketIpStatus IpSocket::send(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) {
U32 total = 0;
I32 sent = 0;
// Attempt to send out data and retry as necessary
for (U32 i = 0; (i < SOCKET_MAX_ITERATIONS) && (total < size); i++) {
errno = 0;
// Send using my specific protocol
sent = this->sendProtocol(fd, data + total, size - total);
sent = this->sendProtocol(socketDescriptor, data + total, size - total);
// Error is EINTR or timeout just try again
if (((sent == -1) && (errno == EINTR)) || (sent == 0)) {
continue;
@ -159,13 +160,13 @@ SocketIpStatus IpSocket::send(NATIVE_INT_TYPE fd, const U8* const data, const U3
return SOCK_SUCCESS;
}
SocketIpStatus IpSocket::recv(NATIVE_INT_TYPE fd, U8* data, U32& req_read) {
SocketIpStatus IpSocket::recv(const SocketDescriptor& socketDescriptor, U8* data, U32& req_read) {
I32 size = 0;
// Try to read until we fail to receive data
for (U32 i = 0; (i < SOCKET_MAX_ITERATIONS) && (size <= 0); i++) {
errno = 0;
// Attempt to recv out data
size = this->recvProtocol(fd, data, req_read);
size = this->recvProtocol(socketDescriptor, data, req_read);
// Nothing to be received
if ((size == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) {

View File

@ -17,6 +17,12 @@
#include <Os/Mutex.hpp>
namespace Drv {
struct SocketDescriptor final {
PlatformIntType fd = -1; //!< Used for all sockets to track the communication file descriptor
PlatformIntType serverFd = -1; //!< Used for server sockets to track the listening file descriptor
};
/**
* \brief Status enumeration for socket return values
*/
@ -36,7 +42,8 @@ enum SocketIpStatus {
SOCK_SEND_ERROR = -13, //!< Failed to send after configured retries
SOCK_NOT_STARTED = -14, //!< Socket has not been started
SOCK_FAILED_TO_READ_BACK_PORT = -15, //!< Failed to read back port from connection
SOCK_NO_DATA_AVAILABLE = -16 //!< No data available or read operation would block
SOCK_NO_DATA_AVAILABLE = -16, //!< No data available or read operation would block
SOCK_ANOTHER_THREAD_OPENING = -17 //!< Another thread is opening
};
/**
@ -70,16 +77,6 @@ class IpSocket {
SocketIpStatus configure(const char* hostname, const U16 port, const U32 send_timeout_seconds,
const U32 send_timeout_microseconds);
/**
* \brief startup the socket, a no-op on unless this is server
*
* This will start-up the socket. In the case of most sockets, this is a no-op. On server sockets this binds to the
* server address and progresses through the `listen` step such that on `open` new clients may be accepted.
*
* \return status of startup
*/
virtual SocketIpStatus startup();
/**
* \brief open the IP socket for communications
*
@ -95,10 +92,10 @@ class IpSocket {
*
* Note: delegates to openProtocol for protocol specific implementation
*
* \param fd: file descriptor to open
* \param socketDescriptor: socket descriptor to update with opened port
* \return status of open
*/
SocketIpStatus open(NATIVE_INT_TYPE& fd);
SocketIpStatus open(SocketDescriptor& socketDescriptor);
/**
* \brief send data out the IP socket from the given buffer
*
@ -115,7 +112,7 @@ class IpSocket {
* \param size: size of data to send
* \return status of the send, SOCK_DISCONNECTED to reopen, SOCK_SUCCESS on success, something else on error
*/
SocketIpStatus send(NATIVE_INT_TYPE fd, const U8* const data, const U32 size);
SocketIpStatus send(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size);
/**
* \brief receive data from the IP socket from the given buffer
*
@ -127,31 +124,35 @@ class IpSocket {
*
* Note: delegates to `recvProtocol` to send the data
*
* \param fd: file descriptor to recv from
* \param socketDescriptor: socket descriptor to recv from
* \param data: pointer to data to fill with received data
* \param size: maximum size of data buffer to fill
* \return status of the send, SOCK_DISCONNECTED to reopen, SOCK_SUCCESS on success, something else on error
*/
SocketIpStatus recv(NATIVE_INT_TYPE fd, U8* const data, U32& size);
SocketIpStatus recv(const SocketDescriptor& fd, U8* const data, U32& size);
/**
* \brief closes the socket
*
* Closes the socket opened by the open call. In this case of the TcpServer, this does NOT close server's listening
* port (call `shutdown`) but will close the active client connection.
* port but will close the active client connection.
*
* \param fd: file descriptor to close
* \param socketDescriptor: socket descriptor to close
*/
void close(NATIVE_INT_TYPE fd);
void close(const SocketDescriptor& socketDescriptor);
/**
* \brief shutdown the socket
*
* Closes the socket opened by the open call. In this case of the TcpServer, this does close server's listening
* port. This will shutdown all clients.
* Shuts down the socket opened by the open call. In this case of the TcpServer, this does shut down server's
* listening port, but rather shuts down the active client.
*
* \param fd: file descriptor to shutdown
* A shut down begins the termination of communication. The underlying socket will coordinate a clean shutdown, and
* it is safe to close the socket once a recv with 0 size has returned or an appropriate timeout has been reached.
*
* \param socketDescriptor: socket descriptor to shutdown
*/
virtual void shutdown(NATIVE_INT_TYPE fd);
void shutdown(const SocketDescriptor& socketDescriptor);
PROTECTED:
/**
@ -168,10 +169,10 @@ class IpSocket {
/**
* \brief setup the socket timeout properties of the opened outgoing socket
* \param socketFd: file descriptor to setup
* \param socketDescriptor: socket descriptor to setup
* \return status of timeout setup
*/
SocketIpStatus setupTimeouts(NATIVE_INT_TYPE socketFd);
SocketIpStatus setupTimeouts(PlatformIntType socketFd);
/**
* \brief converts a given address in dot form x.x.x.x to an ip address. ONLY works for IPv4.
@ -182,27 +183,27 @@ class IpSocket {
static SocketIpStatus addressToIp4(const char* address, void* ip4);
/**
* \brief Protocol specific open implementation, called from open.
* \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \param socketDescriptor: (output) socket descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \return status of open
*/
virtual SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) = 0;
virtual SocketIpStatus openProtocol(SocketDescriptor& fd) = 0;
/**
* \brief Protocol specific implementation of send. Called directly with retry from send.
* \param fd: file descriptor to send to
* \param socketDescriptor: socket descriptor to send to
* \param data: data to send
* \param size: size of data to send
* \return: size of data sent, or -1 on error.
*/
virtual I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) = 0;
virtual I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) = 0;
/**
* \brief Protocol specific implementation of recv. Called directly with error handling from recv.
* \param fd: file descriptor to recv from
* \param socket: socket descriptor to recv from
* \param data: data pointer to fill
* \param size: size of data buffer
* \return: size of data received, or -1 on error.
*/
virtual I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) = 0;
virtual I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) = 0;
U32 m_timeoutSeconds;
U32 m_timeoutMicroseconds;

View File

@ -15,11 +15,9 @@
#include <Fw/Types/Assert.hpp>
#include <cerrno>
#define MAXIMUM_SIZE 0x7FFFFFFF
namespace Drv {
SocketComponentHelper::SocketComponentHelper() : m_fd(-1), m_reconnect(false), m_stop(false), m_started(false), m_open(false) {}
SocketComponentHelper::SocketComponentHelper() {}
SocketComponentHelper::~SocketComponentHelper() {}
@ -29,61 +27,64 @@ void SocketComponentHelper::start(const Fw::StringBase &name,
const Os::Task::ParamType stack,
const Os::Task::ParamType cpuAffinity) {
FW_ASSERT(m_task.getState() == Os::Task::State::NOT_STARTED); // It is a coding error to start this task multiple times
FW_ASSERT(not this->m_stop); // It is a coding error to stop the thread before it is started
this->m_stop = false;
m_reconnect = reconnect;
// Note: the first step is for the IP socket to open the port
Os::Task::Arguments arguments(name, SocketComponentHelper::readTask, this, priority, stack, cpuAffinity);
Os::Task::Status stat = m_task.start(arguments);
FW_ASSERT(Os::Task::OP_OK == stat, static_cast<NATIVE_INT_TYPE>(stat));
}
SocketIpStatus SocketComponentHelper::startup() {
this->m_started = true;
return this->getSocketHandler().startup();
}
bool SocketComponentHelper::isStarted() {
bool is_started = false;
is_started = this->m_started;
return is_started;
FW_ASSERT(Os::Task::OP_OK == stat, static_cast<FwAssertArgType>(stat));
}
SocketIpStatus SocketComponentHelper::open() {
SocketIpStatus status = SOCK_FAILED_TO_GET_SOCKET;
NATIVE_INT_TYPE fd = -1;
this->m_lock.lock();
if (not this->m_open) {
FW_ASSERT(this->m_fd == -1 and not this->m_open); // Ensure we are not opening an opened socket
status = this->getSocketHandler().open(fd);
this->m_fd = fd;
// Call connected any time the open is successful
SocketIpStatus status = SOCK_ANOTHER_THREAD_OPENING;
OpenState local_open = OpenState::OPEN;
// Scope to guard lock
{
Os::ScopeLock scopeLock(m_lock);
if (this->m_open == OpenState::NOT_OPEN) {
this->m_open = OpenState::OPENING;
local_open = this->m_open;
} else {
local_open = OpenState::SKIP;
}
}
if (local_open == OpenState::OPENING) {
FW_ASSERT(this->m_descriptor.fd == -1); // Ensure we are not opening an opened socket
status = this->getSocketHandler().open(this->m_descriptor);
// Lock scope
{
Os::ScopeLock scopeLock(m_lock);
if (Drv::SOCK_SUCCESS == status) {
this->m_open = OpenState::OPEN;
} else {
this->m_open = OpenState::NOT_OPEN;
this->m_descriptor.fd = -1;
}
}
// Notify connection on success outside locked scope
if (Drv::SOCK_SUCCESS == status) {
this->m_open = true;
this->m_lock.unlock();
this->connected();
return status;
}
}
this->m_lock.unlock();
return status;
}
bool SocketComponentHelper::isOpened() {
bool is_open = false;
this->m_lock.lock();
is_open = this->m_open;
this->m_lock.unlock();
Os::ScopeLock scopedLock(this->m_lock);
bool is_open = this->m_open == OpenState::OPEN;
return is_open;
}
SocketIpStatus SocketComponentHelper::reconnect() {
SocketIpStatus status = SOCK_SUCCESS;
// Handle opening
// Open a network connection if it has not already been open
if (this->isStarted() and (not this->isOpened())) {
if (not this->isOpened()) {
status = this->open();
if (status == SocketIpStatus::SOCK_ANOTHER_THREAD_OPENING) {
status = SocketIpStatus::SOCK_SUCCESS;
}
}
return status;
}
@ -91,20 +92,21 @@ SocketIpStatus SocketComponentHelper::reconnect() {
SocketIpStatus SocketComponentHelper::send(const U8* const data, const U32 size) {
SocketIpStatus status = SOCK_SUCCESS;
this->m_lock.lock();
NATIVE_INT_TYPE fd = this->m_fd;
SocketDescriptor descriptor = this->m_descriptor;
this->m_lock.unlock();
// Prevent transmission before connection, or after a disconnect
if (fd == -1) {
if (descriptor.fd == -1) {
status = this->reconnect();
// if reconnect wasn't successful, pass the that up to the caller
if(status != SOCK_SUCCESS) {
return status;
}
// Refresh local copy after reconnect
this->m_lock.lock();
fd = this->m_fd;
descriptor = this->m_descriptor;
this->m_lock.unlock();
}
status = this->getSocketHandler().send(fd, data, size);
status = this->getSocketHandler().send(descriptor, data, size);
if (status == SOCK_DISCONNECTED) {
this->close();
}
@ -112,21 +114,15 @@ SocketIpStatus SocketComponentHelper::send(const U8* const data, const U32 size)
}
void SocketComponentHelper::shutdown() {
this->m_lock.lock();
this->getSocketHandler().shutdown(this->m_fd);
this->m_started = false;
this->m_fd = -1;
this->m_lock.unLock();
Os::ScopeLock scopedLock(this->m_lock);
this->getSocketHandler().shutdown(this->m_descriptor);
}
void SocketComponentHelper::close() {
this->m_lock.lock();
if (this->m_fd != -1) {
this->getSocketHandler().close(this->m_fd);
this->m_fd = -1;
}
this->m_open = false;
this->m_lock.unlock();
Os::ScopeLock scopedLock(this->m_lock);
this->getSocketHandler().close(this->m_descriptor);
this->m_descriptor.fd = -1;
this->m_open = OpenState::NOT_OPEN;
}
Os::Task::Status SocketComponentHelper::join() {
@ -134,69 +130,79 @@ Os::Task::Status SocketComponentHelper::join() {
}
void SocketComponentHelper::stop() {
this->m_lock.lock();
this->m_stop = true;
this->m_lock.unlock();
// Scope to protect lock
{
Os::ScopeLock scopeLock(m_lock);
this->m_stop = true;
}
this->shutdown(); // Break out of any receives and fully shutdown
}
bool SocketComponentHelper::running() {
Os::ScopeLock scopedLock(this->m_lock);
bool running = not this->m_stop;
return running;
}
SocketIpStatus SocketComponentHelper::recv(U8* data, U32 &size) {
SocketIpStatus status = SOCK_SUCCESS;
// Check for previously disconnected socket
this->m_lock.lock();
NATIVE_INT_TYPE fd = this->m_fd;
SocketDescriptor descriptor = this->m_descriptor;
this->m_lock.unlock();
if (fd == -1) {
if (descriptor.fd == -1) {
return SOCK_DISCONNECTED;
}
status = this->getSocketHandler().recv(fd, data, size);
status = this->getSocketHandler().recv(descriptor, data, size);
if (status == SOCK_DISCONNECTED) {
this->close();
}
return status;
}
void SocketComponentHelper::readTask(void* pointer) {
FW_ASSERT(pointer);
void SocketComponentHelper::readLoop() {
SocketIpStatus status = SOCK_SUCCESS;
SocketComponentHelper* self = reinterpret_cast<SocketComponentHelper*>(pointer);
do {
// Prevent transmission before connection, or after a disconnect
if ((not self->isOpened()) and (not self->m_stop)) {
status = self->reconnect();
if(status != SOCK_SUCCESS) {
Fw::Logger::log(
"[WARNING] Failed to open port with status %d and errno %d\n",
status,
errno);
(void) Os::Task::delay(SOCKET_RETRY_INTERVAL);
if ((not this->isOpened()) and this->running()) {
status = this->reconnect();
if (status != SOCK_SUCCESS) {
Fw::Logger::log("[WARNING] Failed to open port with status %d and errno %d\n", status, errno);
(void)Os::Task::delay(SOCKET_RETRY_INTERVAL);
continue;
}
}
// If the network connection is open, read from it
if (self->isStarted() and self->isOpened() and (not self->m_stop)) {
Fw::Buffer buffer = self->getBuffer();
if (this->isOpened() and this->running()) {
Fw::Buffer buffer = this->getBuffer();
U8* data = buffer.getData();
FW_ASSERT(data);
U32 size = buffer.getSize();
// recv blocks, so it may have been a while since its done an isOpened check
status = self->recv(data, size);
if ((status != SOCK_SUCCESS) && (status != SOCK_INTERRUPTED_TRY_AGAIN && (status != SOCK_NO_DATA_AVAILABLE))) {
status = this->recv(data, size);
if ((status != SOCK_SUCCESS) && (status != SOCK_INTERRUPTED_TRY_AGAIN) && (status != SOCK_NO_DATA_AVAILABLE)) {
Fw::Logger::log("[WARNING] Failed to recv from port with status %d and errno %d\n",
status,
errno);
self->close();
status,
errno);
this->close();
buffer.setSize(0);
} else {
// Send out received data
buffer.setSize(size);
}
self->sendBuffer(buffer, status);
this->sendBuffer(buffer, status);
}
}
// As long as not told to stop, and we are successful interrupted or ordered to retry, keep receiving
while (not self->m_stop &&
(status == SOCK_SUCCESS || status == SOCK_INTERRUPTED_TRY_AGAIN || self->m_reconnect));
self->shutdown(); // Shutdown the port entirely
while (this->running() &&
(status == SOCK_SUCCESS || (status == SOCK_NO_DATA_AVAILABLE) || status == SOCK_INTERRUPTED_TRY_AGAIN || this->m_reconnect));
// Close the socket
this->close(); // Close the port entirely
}
void SocketComponentHelper::readTask(void* pointer) {
FW_ASSERT(pointer);
SocketComponentHelper* self = reinterpret_cast<SocketComponentHelper*>(pointer);
self->readLoop();
}
} // namespace Drv

View File

@ -27,6 +27,12 @@ namespace Drv {
*/
class SocketComponentHelper {
public:
enum OpenState{
NOT_OPEN,
OPENING,
OPEN,
SKIP
};
/**
* \brief constructs the socket read task
*/
@ -56,25 +62,6 @@ class SocketComponentHelper {
const Os::Task::ParamType stack = Os::Task::TASK_DEFAULT,
const Os::Task::ParamType cpuAffinity = Os::Task::TASK_DEFAULT);
/**
* \brief startup the socket for communications
*
* Status of the socket handler.
*
* Note: this just delegates to the handler
*
* \return status of open, SOCK_SUCCESS for success, something else on error
*/
SocketIpStatus startup();
/**
* \brief Returns true when the socket is started
*
* Returns true when the socket is started up sufficiently to be actively listening to clients. Returns false
* otherwise. This means `startup()` was called and returned success.
*/
bool isStarted();
/**
* \brief open the socket for communications
*
@ -129,9 +116,7 @@ class SocketComponentHelper {
/**
* \brief close the socket communications
*
* Typically stopping the socket read task will shutdown the connection. However, in cases where the read task
* will not be started, this function may be used to close the socket. This calls a full `close` on the client
* socket.
* Close the client connection. This will ensure that the resources used are cleaned-up.
*
* Note: this just delegates to the handler
*/
@ -140,14 +125,18 @@ class SocketComponentHelper {
/**
* \brief shutdown the socket communications
*
* Typically stopping the socket read task will shutdown the connection. However, in cases where the read task
* will not be started, this function may be used to close the socket. This calls a full `shutdown` on the client
* socket.
* Shutdown communication. This will begin the process of cleanly closing communications. This process will be
* finished with a receive of 0 size and should be followed by a close.
*
* Note: this just delegates to the handler
*/
void shutdown();
/**
* \brief is the read loop running
*/
bool running();
/**
* \brief stop the socket read task and close the associated socket.
*
@ -166,8 +155,11 @@ class SocketComponentHelper {
*/
Os::Task::Status join();
PROTECTED:
/**
* \brief receive off the TCP socket
*/
virtual void readLoop();
/**
* \brief returns a reference to the socket handler
*
@ -209,6 +201,7 @@ class SocketComponentHelper {
*/
virtual void connected() = 0;
/**
* \brief a task designed to read from the socket and output incoming data
*
@ -218,13 +211,10 @@ class SocketComponentHelper {
Os::Task m_task;
Os::Mutex m_lock;
NATIVE_INT_TYPE m_fd;
bool m_reconnect; //!< Force reconnection
bool m_stop; //!< Stops the task when set to true
bool m_started; //!< Have we successfully started the socket
bool m_open; //!< Have we successfully opened
SocketDescriptor m_descriptor;
bool m_reconnect = false; //!< Force reconnection
bool m_stop = true; //!< Stops the task when set to true
OpenState m_open = OpenState::NOT_OPEN; //!< Have we successfully opened
};
}
#endif // DRV_SocketComponentHelper_HPP

View File

@ -47,7 +47,7 @@ bool TcpClientSocket::isValidPort(U16 port) {
}
SocketIpStatus TcpClientSocket::openProtocol(NATIVE_INT_TYPE& fd) {
SocketIpStatus TcpClientSocket::openProtocol(SocketDescriptor& socketDescriptor) {
NATIVE_INT_TYPE socketFd = -1;
struct sockaddr_in address;
@ -81,17 +81,17 @@ SocketIpStatus TcpClientSocket::openProtocol(NATIVE_INT_TYPE& fd) {
::close(socketFd);
return SOCK_FAILED_TO_CONNECT;
}
fd = socketFd;
socketDescriptor.fd = socketFd;
Fw::Logger::log("Connected to %s:%hu as a tcp client\n", m_hostname, m_port);
return SOCK_SUCCESS;
}
I32 TcpClientSocket::sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) {
return static_cast<I32>(::send(fd, data, size, SOCKET_IP_SEND_FLAGS));
I32 TcpClientSocket::sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) {
return static_cast<I32>(::send(socketDescriptor.fd, data, size, SOCKET_IP_SEND_FLAGS));
}
I32 TcpClientSocket::recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) {
return static_cast<I32>(::recv(fd, data, size, SOCKET_IP_RECV_FLAGS));
I32 TcpClientSocket::recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) {
return static_cast<I32>(::recv(socketDescriptor.fd, data, size, SOCKET_IP_RECV_FLAGS));
}
} // namespace Drv

View File

@ -44,26 +44,26 @@ class TcpClientSocket : public IpSocket {
/**
* \brief Tcp specific implementation for opening a client socket.
* \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \param socketDescriptor: (output) descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \return status of open
*/
SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) override;
SocketIpStatus openProtocol(SocketDescriptor& socketDescriptor) override;
/**
* \brief Protocol specific implementation of send. Called directly with retry from send.
* \param fd: file descriptor to send to
* \param socketDescriptor: descriptor to send to
* \param data: data to send
* \param size: size of data to send
* \return: size of data sent, or -1 on error.
*/
I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) override;
I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) override;
/**
* \brief Protocol specific implementation of recv. Called directly with error handling from recv.
* \param fd: file descriptor to recv from
* \param socketDescriptor: descriptor to recv from
* \param data: data pointer to fill
* \param size: size of data buffer
* \return: size of data received, or -1 on error.
*/
I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) override;
I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) override;
};
} // namespace Drv

View File

@ -38,15 +38,15 @@
namespace Drv {
TcpServerSocket::TcpServerSocket() : IpSocket(), m_base_fd(-1) {}
TcpServerSocket::TcpServerSocket() : IpSocket() {}
U16 TcpServerSocket::getListenPort() {
U16 port = this->m_port;
return port;
}
SocketIpStatus TcpServerSocket::startup() {
NATIVE_INT_TYPE serverFd = -1;
SocketIpStatus TcpServerSocket::startup(SocketDescriptor& socketDescriptor) {
PlatformIntType serverFd = -1;
struct sockaddr_in address;
// Acquire a socket, or return error
if ((serverFd = ::socket(AF_INET, SOCK_STREAM, 0)) == -1) {
@ -77,34 +77,30 @@ SocketIpStatus TcpServerSocket::startup() {
::close(serverFd);
return SOCK_FAILED_TO_READ_BACK_PORT;
}
U16 port = ntohs(address.sin_port);
Fw::Logger::log("Listening for single client at %s:%hu\n", m_hostname, port);
// TCP requires listening on the socket. Since we only expect a single client, set the TCP backlog (second argument) to 1 to prevent queuing of multiple clients.
if (::listen(serverFd, 1) < 0) {
::close(serverFd);
return SOCK_FAILED_TO_LISTEN; // What we have here is a failure to communicate
}
m_base_fd = serverFd;
m_port = port;
return this->IpSocket::startup();
Fw::Logger::log("Listening for single client at %s:%hu\n", m_hostname, m_port);
FW_ASSERT(serverFd != -1);
socketDescriptor.serverFd = serverFd;
this->m_port = ntohs(address.sin_port);
return SOCK_SUCCESS;
}
void TcpServerSocket::shutdown(NATIVE_INT_TYPE fd) {
if (this->m_base_fd != -1) {
(void)::shutdown(this->m_base_fd, SHUT_RDWR);
(void)::close(this->m_base_fd);
this->m_base_fd = -1;
void TcpServerSocket::terminate(const SocketDescriptor& socketDescriptor) {
(void)::close(socketDescriptor.serverFd);
}
SocketIpStatus TcpServerSocket::openProtocol(SocketDescriptor& socketDescriptor) {
PlatformIntType clientFd = -1;
PlatformIntType serverFd = socketDescriptor.serverFd;
// Check for not started yet, may be true in the case of start-up reconnect attempts
if (serverFd == -1) {
return SOCK_NOT_STARTED;
}
this->IpSocket::shutdown(fd);
}
SocketIpStatus TcpServerSocket::openProtocol(NATIVE_INT_TYPE& fd) {
NATIVE_INT_TYPE clientFd = -1;
NATIVE_INT_TYPE serverFd = -1;
serverFd = this->m_base_fd;
// TCP requires accepting on the socket to get the client socket file descriptor.
clientFd = ::accept(serverFd, nullptr, nullptr);
@ -118,18 +114,18 @@ SocketIpStatus TcpServerSocket::openProtocol(NATIVE_INT_TYPE& fd) {
}
Fw::Logger::log("Accepted client at %s:%hu\n", m_hostname, m_port);
fd = clientFd;
socketDescriptor.fd = clientFd;
return SOCK_SUCCESS;
}
I32 TcpServerSocket::sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) {
return static_cast<I32>(::send(fd, data, size, SOCKET_IP_SEND_FLAGS));
I32 TcpServerSocket::sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) {
return static_cast<I32>(::send(socketDescriptor.fd, data, size, SOCKET_IP_SEND_FLAGS));
}
I32 TcpServerSocket::recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) {
I32 TcpServerSocket::recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) {
I32 size_buf;
// recv will return 0 if the client has done an orderly shutdown
size_buf = static_cast<I32>(::recv(fd, data, size, SOCKET_IP_RECV_FLAGS));
size_buf = static_cast<I32>(::recv(socketDescriptor.fd, data, size, SOCKET_IP_RECV_FLAGS));
return size_buf;
}

View File

@ -17,6 +17,7 @@
#include <IpCfg.hpp>
namespace Drv {
/**
* \brief Helper for setting up Tcp using Berkeley sockets as a server
*
@ -36,18 +37,20 @@ class TcpServerSocket : public IpSocket {
* Opens the server's listening socket such that this server can listen for incoming client requests. Given the
* nature of this component, only one (1) client can be handled at a time. After this call succeeds, clients may
* connect. This call does not block, block occurs on `open` while waiting to accept incoming clients.
* \param socketDescriptor: server descriptor will be written here
* \return status of the server socket setup.
*/
SocketIpStatus startup() override;
SocketIpStatus startup(SocketDescriptor& socketDescriptor);
/**
* \brief Shutdown and close the server socket followed by the open client
* \brief close the server socket created by the `startup` call
*
* \param fd: file descriptor to shutdown
* First, this calls `shutdown` and `close` on the server socket and then calls the close method to `shutdown` and
* `close` the client.
* Calls the close function on the server socket. No shutdown is performed on the server socket, as that is left to
* the individual client sockets.
*
* \param socketDescriptor: descriptor to close
*/
void shutdown(NATIVE_INT_TYPE fd) override;
void terminate(const SocketDescriptor& socketDescriptor);
/**
* \brief get the port being listened on
@ -62,28 +65,29 @@ class TcpServerSocket : public IpSocket {
PROTECTED:
/**
* \brief Tcp specific implementation for opening a client socket connected to this server.
* \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \param socketDescriptor: (output) descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \return status of open
*/
SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) override;
SocketIpStatus openProtocol(SocketDescriptor& socketDescriptor) override;
/**
* \brief Protocol specific implementation of send. Called directly with retry from send.
* \param fd: file descriptor to send to
* \param socketDescriptor: descriptor to send to
* \param data: data to send
* \param size: size of data to send
* \return: size of data sent, or -1 on error.
*/
I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) override;
I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) override;
/**
* \brief Protocol specific implementation of recv. Called directly with error handling from recv.
* \param fd: file descriptor to recv from
* \param socketDescriptor: descriptor to recv from
* \param data: data pointer to fill
* \param size: size of data buffer
* \return: size of data received, or -1 on error.
*/
I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) override;
PRIVATE:
NATIVE_INT_TYPE m_base_fd; //!< File descriptor of the listening socket
I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) override;
};
} // namespace Drv

View File

@ -80,9 +80,9 @@ U16 UdpSocket::getRecvPort() {
}
SocketIpStatus UdpSocket::bind(NATIVE_INT_TYPE fd) {
SocketIpStatus UdpSocket::bind(const PlatformIntType fd) {
struct sockaddr_in address;
FW_ASSERT(-1 != fd);
FW_ASSERT(fd != -1);
// Set up the address port and name
address.sin_family = AF_INET;
@ -112,7 +112,7 @@ SocketIpStatus UdpSocket::bind(NATIVE_INT_TYPE fd) {
return SOCK_SUCCESS;
}
SocketIpStatus UdpSocket::openProtocol(NATIVE_INT_TYPE& fd) {
SocketIpStatus UdpSocket::openProtocol(SocketDescriptor& socketDescriptor) {
SocketIpStatus status = SOCK_SUCCESS;
NATIVE_INT_TYPE socketFd = -1;
struct sockaddr_in address;
@ -169,19 +169,19 @@ SocketIpStatus UdpSocket::openProtocol(NATIVE_INT_TYPE& fd) {
port);
}
FW_ASSERT(status == SOCK_SUCCESS, status);
fd = socketFd;
socketDescriptor.fd = socketFd;
return status;
}
I32 UdpSocket::sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) {
I32 UdpSocket::sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) {
FW_ASSERT(this->m_state->m_addr_send.sin_family != 0); // Make sure the address was previously setup
return static_cast<I32>(::sendto(fd, data, size, SOCKET_IP_SEND_FLAGS,
return static_cast<I32>(::sendto(socketDescriptor.fd, data, size, SOCKET_IP_SEND_FLAGS,
reinterpret_cast<struct sockaddr *>(&this->m_state->m_addr_send), sizeof(this->m_state->m_addr_send)));
}
I32 UdpSocket::recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) {
I32 UdpSocket::recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) {
FW_ASSERT(this->m_state->m_addr_recv.sin_family != 0); // Make sure the address was previously setup
return static_cast<I32>(::recvfrom(fd, data, size, SOCKET_IP_RECV_FLAGS, nullptr, nullptr));
return static_cast<I32>(::recvfrom(socketDescriptor.fd, data, size, SOCKET_IP_RECV_FLAGS, nullptr, nullptr));
}
} // namespace Drv

View File

@ -18,9 +18,6 @@
namespace Drv {
/**
* \brief a structure used to hold the encapsulated socket state to prevent namespace collision
*/
struct SocketState;
/**
@ -89,32 +86,32 @@ class UdpSocket : public IpSocket {
/**
* \brief bind the UDP to a port such that it can receive packets at the previously configured port
* \param fd: socket file descriptor used in bind
* \param socketDescriptor: socket descriptor used in bind
* \return status of the bind
*/
SocketIpStatus bind(NATIVE_INT_TYPE fd);
SocketIpStatus bind(const PlatformIntType fd);
/**
* \brief udp specific implementation for opening a socket.
* \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \param socketDescriptor: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \return status of open
*/
SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) override;
SocketIpStatus openProtocol(SocketDescriptor& socketDescriptor) override;
/**
* \brief Protocol specific implementation of send. Called directly with retry from send.
* \param fd: file descriptor to send to
* \param socketDescriptor: descriptor to send to
* \param data: data to send
* \param size: size of data to send
* \return: size of data sent, or -1 on error.
*/
I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) override;
I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) override;
/**
* \brief Protocol specific implementation of recv. Called directly with error handling from recv.
* \param fd: file descriptor to recv from
* \param socketDescriptor: descriptor to recv from
* \param data: data pointer to fill
* \param size: size of data buffer
* \return: size of data received, or -1 on error.
*/
I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) override;
I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) override;
private:
SocketState* m_state; //!< State storage
U16 m_recv_port; //!< IP address port used

View File

@ -43,21 +43,46 @@ void validate_random_buffer(Fw::Buffer &buffer, U8 *data) {
buffer.setSize(0);
}
void fill_random_buffer(Fw::Buffer &buffer) {
U32 fill_random_buffer(Fw::Buffer &buffer) {
buffer.setSize(STest::Pick::lowerUpper(1, buffer.getSize()));
fill_random_data(buffer.getData(), buffer.getSize());
return static_cast<U32>(buffer.getSize());
}
void send_recv(Drv::IpSocket& sender, Drv::IpSocket& receiver, NATIVE_INT_TYPE sender_fd, NATIVE_INT_TYPE receiver_fd) {
void drain(Drv::IpSocket& receiver, Drv::SocketDescriptor& receiver_fd) {
Drv::SocketIpStatus status = SOCK_SUCCESS;
// Drain the server in preparation for close
while (status == Drv::SOCK_SUCCESS || status == Drv::SOCK_NO_DATA_AVAILABLE) {
U8 buffer[1];
U32 size = sizeof buffer;
status = receiver.recv(receiver_fd, buffer, size);
}
ASSERT_EQ(status, Drv::SocketIpStatus::SOCK_DISCONNECTED) << "Socket did not disconnect as expected";
}
void receive_all(Drv::IpSocket& receiver, Drv::SocketDescriptor& receiver_fd, U8* buffer, U32 size) {
ASSERT_NE(buffer, nullptr);
U32 received_size = 0;
Drv::SocketIpStatus status;
do {
U32 size_in_out = size - received_size;
status = receiver.recv(receiver_fd, buffer + received_size, size_in_out);
ASSERT_TRUE((status == Drv::SOCK_NO_DATA_AVAILABLE || status == Drv::SOCK_SUCCESS));
received_size += size_in_out;
} while (size > received_size);
EXPECT_EQ(received_size, size);
}
void send_recv(Drv::IpSocket& sender, Drv::IpSocket& receiver, Drv::SocketDescriptor& sender_fd, Drv::SocketDescriptor& receiver_fd) {
U32 size = MAX_DRV_TEST_MESSAGE_SIZE;
U8 buffer_out[MAX_DRV_TEST_MESSAGE_SIZE] = {0};
U8 buffer_in[MAX_DRV_TEST_MESSAGE_SIZE] = {0};
// Send receive validate block
Drv::Test::fill_random_data(buffer_out, MAX_DRV_TEST_MESSAGE_SIZE);
EXPECT_EQ(sender.send(sender_fd, buffer_out, MAX_DRV_TEST_MESSAGE_SIZE), Drv::SOCK_SUCCESS);
EXPECT_EQ(receiver.recv(receiver_fd, buffer_in, size), Drv::SOCK_SUCCESS);
EXPECT_EQ(size, static_cast<U32>(MAX_DRV_TEST_MESSAGE_SIZE));
receive_all(receiver, receiver_fd, buffer_in, size);
Drv::Test::validate_random_data(buffer_out, buffer_in, MAX_DRV_TEST_MESSAGE_SIZE);
}

View File

@ -47,7 +47,7 @@ void validate_random_buffer(Fw::Buffer &buffer, U8 *data);
* Fill random data into the buffer (using a random length).
* @param buffer: buffer to fill.
*/
void fill_random_buffer(Fw::Buffer &buffer);
U32 fill_random_buffer(Fw::Buffer &buffer);
/**
* Send/receive pair.
@ -56,7 +56,23 @@ void fill_random_buffer(Fw::Buffer &buffer);
* @param sender_fd: file descriptor for sender
* @param receiver_fd: file descriptor for receiver
*/
void send_recv(Drv::IpSocket& sender, Drv::IpSocket& receiver, NATIVE_INT_TYPE sender_fd, NATIVE_INT_TYPE receiver_fd);
void send_recv(Drv::IpSocket& sender, Drv::IpSocket& receiver, Drv::SocketDescriptor& sender_fd, Drv::SocketDescriptor& receiver_fd);
/**
* Drain bytes from the socket until disconnect received.
* @warning: must have called shutdown on the remote before calling this
* @param drain_fd: file descriptor for draining
*/
void drain(Drv::IpSocket& receiver, Drv::SocketDescriptor& drain_fd);
/**
* Receive all data, reassembling the frame
* @param receiver: receiver
* @param receiver_fd: receiver descriptor
* @param buffer: buffer
* @param size: size to receive
*/
void receive_all(Drv::IpSocket& receiver, Drv::SocketDescriptor& receiver_fd, U8* buffer, U32 size);
/**
* Wait on socket change.

View File

@ -19,11 +19,11 @@ void test_with_loop(U32 iterations) {
U16 port = 0; // Choose a port
Drv::TcpServerSocket server;
NATIVE_INT_TYPE server_fd = -1;
NATIVE_INT_TYPE client_fd = -1;
Drv::SocketDescriptor server_fd;
Drv::SocketDescriptor client_fd;
server.configure("127.0.0.1", port, 0, 100);
EXPECT_EQ(server.startup(), Drv::SOCK_SUCCESS);
Drv::Test::force_recv_timeout(server_fd, server);
EXPECT_EQ(server.startup(server_fd), Drv::SOCK_SUCCESS);
Drv::Test::force_recv_timeout(server_fd.fd, server);
// Loop through a bunch of client disconnects
for (U32 i = 0; i < iterations; i++) {
@ -31,7 +31,7 @@ void test_with_loop(U32 iterations) {
client.configure("127.0.0.1", server.getListenPort(),0,100);
// client_fd gets assigned a real value here
status1 = client.open(client_fd);
EXPECT_EQ(status1, Drv::SOCK_SUCCESS);
EXPECT_EQ(status1, Drv::SOCK_SUCCESS) << "With errno: " << errno;
// client_fd gets assigned a real value here
status2 = server.open(server_fd);
@ -41,15 +41,18 @@ void test_with_loop(U32 iterations) {
// If all the opens worked, then run this
if (Drv::SOCK_SUCCESS == status1 && Drv::SOCK_SUCCESS == status2) {
// Force the sockets not to hang, if at all possible
Drv::Test::force_recv_timeout(client_fd, client);
Drv::Test::force_recv_timeout(server_fd, server);
Drv::Test::force_recv_timeout(client_fd.fd, client);
Drv::Test::force_recv_timeout(server_fd.fd, server);
Drv::Test::send_recv(server, client, server_fd, client_fd);
Drv::Test::send_recv(client, server, client_fd, server_fd);
}
client.close(client_fd);
server.shutdown(client_fd);
// Drain the server before close
Drv::Test::drain(server, server_fd);
server.close(server_fd);
client.close(client_fd);
}
server.shutdown(server_fd);
server.terminate(server_fd);
}

View File

@ -15,8 +15,8 @@ void test_with_loop(U32 iterations, bool duplex) {
Drv::SocketIpStatus status1 = Drv::SOCK_SUCCESS;
Drv::SocketIpStatus status2 = Drv::SOCK_SUCCESS;
NATIVE_INT_TYPE udp1_fd = -1;
NATIVE_INT_TYPE udp2_fd = -1;
Drv::SocketDescriptor udp1_fd;
Drv::SocketDescriptor udp2_fd;
U16 port1 = Drv::Test::get_free_port(true);
ASSERT_NE(0, port1);
@ -49,8 +49,8 @@ void test_with_loop(U32 iterations, bool duplex) {
// If all the opens worked, then run this
if (Drv::SOCK_SUCCESS == status1 && Drv::SOCK_SUCCESS == status2) {
// Force the sockets not to hang, if at all possible
Drv::Test::force_recv_timeout(udp1_fd, udp1);
Drv::Test::force_recv_timeout(udp2_fd, udp2);
Drv::Test::force_recv_timeout(udp1_fd.fd, udp1);
Drv::Test::force_recv_timeout(udp2_fd.fd, udp2);
Drv::Test::send_recv(udp1, udp2, udp1_fd, udp2_fd);
// Allow duplex connections
if (duplex) {

View File

@ -23,8 +23,7 @@ namespace Drv {
// ----------------------------------------------------------------------
TcpClientComponentImpl::TcpClientComponentImpl(const char* const compName)
: TcpClientComponentBase(compName),
SocketComponentHelper() {}
: TcpClientComponentBase(compName) {}
SocketIpStatus TcpClientComponentImpl::configure(const char* hostname,
const U16 port,
@ -35,7 +34,6 @@ SocketIpStatus TcpClientComponentImpl::configure(const char* hostname,
// Check that ensures the configured buffer size fits within the limits fixed-width type, U32
FW_ASSERT(buffer_size <= std::numeric_limits<U32>::max(), static_cast<FwAssertArgType>(buffer_size));
m_allocation_size = buffer_size; // Store the buffer size
(void)startup();
return m_socket.configure(hostname, port, send_timeout_seconds, send_timeout_microseconds);
}

View File

@ -5,22 +5,22 @@
#include "TcpClientTester.hpp"
TEST(Nominal, BasicMessaging) {
TEST(Nominal, TcpClientBasicMessaging) {
Drv::TcpClientTester tester;
tester.test_basic_messaging();
}
TEST(Nominal, BasicReceiveThread) {
TEST(Nominal, TcpClientBasicReceiveThread) {
Drv::TcpClientTester tester;
tester.test_receive_thread();
}
TEST(Reconnect, MultiMessaging) {
TEST(Reconnect, TcpClientMultiMessaging) {
Drv::TcpClientTester tester;
tester.test_multiple_messaging();
}
TEST(Reconnect, ReceiveThreadReconnect) {
TEST(Reconnect, TcpClientReceiveThreadReconnect) {
Drv::TcpClientTester tester;
tester.test_advanced_reconnect();
}

View File

@ -33,9 +33,9 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) {
Drv::TcpServerSocket server;
server.configure("127.0.0.1", port, 0, 100);
NATIVE_INT_TYPE client_fd = -1;
Drv::SocketDescriptor server_fd;
serverStat = server.startup();
serverStat = server.startup(server_fd);
this->component.configure("127.0.0.1", server.getListenPort(), 0, 100);
ASSERT_EQ(serverStat, SOCK_SUCCESS)
@ -60,36 +60,28 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) {
}
EXPECT_TRUE(this->component.isOpened());
// fd has now been updated to be a value we need to keep track of
status2 = server.open(client_fd);
status2 = server.open(server_fd);
EXPECT_EQ(status1, Drv::SOCK_SUCCESS);
EXPECT_EQ(status2, Drv::SOCK_SUCCESS);
status2 = Drv::SOCK_NO_DATA_AVAILABLE;
// If all the opens worked, then run this
if ((Drv::SOCK_SUCCESS == status1) && (Drv::SOCK_SUCCESS == status2) &&
(this->component.isOpened())) {
// Force the sockets not to hang, if at all possible
Drv::Test::force_recv_timeout(this->component.m_fd, this->component.getSocketHandler());
Drv::Test::force_recv_timeout(server.m_base_fd, server);
Drv::Test::force_recv_timeout(this->component.m_descriptor.fd, this->component.getSocketHandler());
Drv::Test::force_recv_timeout(server_fd.serverFd, server);
m_data_buffer.setSize(sizeof(m_data_storage));
Drv::Test::fill_random_buffer(m_data_buffer);
size = Drv::Test::fill_random_buffer(m_data_buffer);
Drv::SendStatus status = invoke_to_send(0, m_data_buffer);
EXPECT_EQ(status, SendStatus::SEND_OK);
U16 counter = 0;
while ((status2 == Drv::SOCK_NO_DATA_AVAILABLE) and counter < Drv::Test::MAX_ITER) {
status2 = server.recv(client_fd, buffer, size);
counter++;
}
EXPECT_EQ(status2, Drv::SOCK_SUCCESS);
EXPECT_EQ(size, m_data_buffer.getSize());
Drv::Test::receive_all(server, server_fd, buffer, size);
Drv::Test::validate_random_buffer(m_data_buffer, buffer);
// If receive thread is live, try the other way
if (recv_thread) {
m_spinner = false;
m_data_buffer.setSize(sizeof(m_data_storage));
status2 = server.send(client_fd, m_data_buffer.getData(), m_data_buffer.getSize());
status2 = server.send(server_fd, m_data_buffer.getData(), m_data_buffer.getSize());
EXPECT_EQ(status2, Drv::SOCK_SUCCESS);
from_deallocate_handler(0, m_data_buffer);
while (not m_spinner) {}
@ -100,11 +92,16 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) {
this->component.stop();
this->component.join();
} else {
// Client should close to initiate a clean shutdown
// This is because the server "can't know" if the client is done until
// this close is hit, or the server initiates the shutdown.
this->component.close();
}
server.close(client_fd);
// Safe server shutdown after client
Drv::Test::drain(server, server_fd);
server.close(server_fd);
}
server.shutdown(client_fd);
server.terminate(server_fd);
ASSERT_from_ready_SIZE(iterations);
}

View File

@ -13,6 +13,7 @@
#include <Drv/TcpServer/TcpServerComponentImpl.hpp>
#include <FpConfig.hpp>
#include "Fw/Types/Assert.hpp"
#include "Fw/Logger/Logger.hpp"
namespace Drv {
@ -21,8 +22,7 @@ namespace Drv {
// ----------------------------------------------------------------------
TcpServerComponentImpl::TcpServerComponentImpl(const char* const compName)
: TcpServerComponentBase(compName),
SocketComponentHelper() {}
: TcpServerComponentBase(compName) {}
SocketIpStatus TcpServerComponentImpl::configure(const char* hostname,
const U16 port,
@ -70,6 +70,48 @@ void TcpServerComponentImpl::connected() {
}
}
bool TcpServerComponentImpl::isStarted() {
Os::ScopeLock scopedLock(this->m_lock);
return this->m_descriptor.serverFd != -1;
}
SocketIpStatus TcpServerComponentImpl::startup() {
Os::ScopeLock scopedLock(this->m_lock);
Drv::SocketIpStatus status = SOCK_SUCCESS;
// Prevent multiple startup attempts
if (this->m_descriptor.serverFd == -1) {
status = this->m_socket.startup(this->m_descriptor);
}
return status;
}
void TcpServerComponentImpl::terminate() {
Os::ScopeLock scopedLock(this->m_lock);
this->m_socket.terminate(this->m_descriptor);
this->m_descriptor.serverFd = -1;
}
void TcpServerComponentImpl::readLoop() {
Drv::SocketIpStatus status = Drv::SocketIpStatus::SOCK_NOT_STARTED;
// Keep trying to reconnect until the status is good, told to stop, or reconnection is turned off
do {
status = this->startup();
if (status != SOCK_SUCCESS) {
Fw::Logger::log("[WARNING] Failed to listen on port %hu with status %d\n", this->getListenPort(), status);
(void)Os::Task::delay(SOCKET_RETRY_INTERVAL);
continue;
}
}
while (this->running() && status != SOCK_SUCCESS && this->m_reconnect);
// If start up was successful then perform normal operations
if (this->running() && status == SOCK_SUCCESS) {
// Perform the nominal read loop
SocketComponentHelper::readLoop();
// Terminate the server
this->terminate();
}
}
// ----------------------------------------------------------------------
// Handler implementations for user-defined typed input ports
// ----------------------------------------------------------------------

View File

@ -62,6 +62,26 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo
const U32 send_timeout_seconds = SOCKET_SEND_TIMEOUT_SECONDS,
const U32 send_timeout_microseconds = SOCKET_SEND_TIMEOUT_MICROSECONDS);
/**
* \brief is started
*/
bool isStarted();
/**
* \brief startup the server socket for communications
*
* Start up the server socket by listening on a port. Note: does not accept clients, this is done in open to
* facilitate re-connection of clients.
*/
SocketIpStatus startup();
/**
* \brief terminate the server socket
*
* Close the server socket. Should be done after all clients are shutdown and closed.
*/
void terminate();
/**
* \brief get the port being listened on
*
@ -85,7 +105,7 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo
*
* \return IpSocket reference
*/
IpSocket& getSocketHandler();
IpSocket& getSocketHandler() override;
/**
* \brief returns a buffer to fill with data
@ -95,7 +115,7 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo
*
* \return Fw::Buffer to fill with data
*/
Fw::Buffer getBuffer();
Fw::Buffer getBuffer() override;
/**
* \brief sends a buffer to be filled with data
@ -105,13 +125,17 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo
*
* \return Fw::Buffer filled with data to send out
*/
void sendBuffer(Fw::Buffer buffer, SocketIpStatus status);
void sendBuffer(Fw::Buffer buffer, SocketIpStatus status) override;
/**
* \brief called when the IPv4 system has been connected
*/
void connected();
void connected() override;
/**
* \brief read from the socket, overridden to start and terminate the server socket
*/
void readLoop() override;
PRIVATE:
@ -134,7 +158,7 @@ class TcpServerComponentImpl : public TcpServerComponentBase, public SocketCompo
* \param fwBuffer: buffer containing data to be sent
* \return SEND_OK on success, SEND_RETRY when critical data should be retried and SEND_ERROR upon error
*/
Drv::SendStatus send_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer);
Drv::SendStatus send_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer) override;
Drv::TcpServerSocket m_socket; //!< Socket implementation
};

View File

@ -4,22 +4,22 @@
#include "TcpServerTester.hpp"
TEST(Nominal, BasicMessaging) {
TEST(Nominal, TcpServerBasicMessaging) {
Drv::TcpServerTester tester;
tester.test_basic_messaging();
}
TEST(Nominal, BasicReceiveThread) {
TEST(Nominal, TcpServerBasicReceiveThread) {
Drv::TcpServerTester tester;
tester.test_receive_thread();
}
TEST(Reconnect, MultiMessaging) {
TEST(Reconnect, TcpServerMultiMessaging) {
Drv::TcpServerTester tester;
tester.test_multiple_messaging();
}
TEST(Reconnect, ReceiveThreadReconnect) {
TEST(Reconnect, TcpServerReceiveThreadReconnect) {
Drv::TcpServerTester tester;
tester.test_advanced_reconnect();
}

View File

@ -27,10 +27,9 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) {
U8 buffer[sizeof(m_data_storage)] = {};
Drv::SocketIpStatus status1 = Drv::SOCK_SUCCESS;
Drv::SocketIpStatus status2 = Drv::SOCK_SUCCESS;
Drv::SocketIpStatus serverStat = Drv::SOCK_SUCCESS;
U16 port = 0;
NATIVE_INT_TYPE client_fd = -1;
Drv::SocketDescriptor client_fd;
status1 = this->component.configure("127.0.0.1", port, 0, 100);
EXPECT_EQ(status1, Drv::SOCK_SUCCESS);
@ -41,46 +40,45 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) {
EXPECT_TRUE(this->wait_on_started(true, Drv::Test::get_configured_delay_ms()/10 + 1));
}
EXPECT_TRUE(component.isStarted());
// Loop through a bunch of client disconnects
for (U32 i = 0; i < iterations && serverStat == SOCK_SUCCESS; i++) {
for (U32 i = 0; i < iterations && status1 == SOCK_SUCCESS; i++) {
Drv::TcpClientSocket client;
client.configure("127.0.0.1", this->component.getListenPort(), 0, 100);
status2 = client.open(client_fd);
EXPECT_EQ(status2, Drv::SocketIpStatus::SOCK_SUCCESS) << "Failed to connect client";
U32 size = sizeof(m_data_storage);
// Not testing with reconnect thread, we will need to open ourselves
if (not recv_thread) {
status1 = this->component.open();
} else {
EXPECT_TRUE(this->wait_on_change(true, Drv::Test::get_configured_delay_ms()/10 + 1));
EXPECT_TRUE(this->wait_on_change(true, Drv::Test::get_configured_delay_ms()/10 + 1)) <<
"On iteration: " << i << " and receive thread: " << recv_thread;
}
EXPECT_TRUE(this->component.isOpened());
EXPECT_TRUE(this->component.isOpened()) <<
"On iteration: " << i << " and receive thread: " << recv_thread;
EXPECT_EQ(status1, Drv::SOCK_SUCCESS);
EXPECT_EQ(status2, Drv::SOCK_SUCCESS);
status2 = Drv::SOCK_NO_DATA_AVAILABLE;
EXPECT_EQ(status1, Drv::SOCK_SUCCESS) <<
"On iteration: " << i << " and receive thread: " << recv_thread;
EXPECT_EQ(status2, Drv::SOCK_SUCCESS) <<
"On iteration: " << i << " and receive thread: " << recv_thread;
// If all the opens worked, then run this
if ((Drv::SOCK_SUCCESS == status1) && (Drv::SOCK_SUCCESS == status2) &&
(this->component.isOpened())) {
// Force the sockets not to hang, if at all possible
Drv::Test::force_recv_timeout(this->component.m_fd, this->component.getSocketHandler());
Drv::Test::force_recv_timeout(client_fd, client);
Drv::Test::force_recv_timeout(this->component.m_descriptor.fd, this->component.getSocketHandler());
Drv::Test::force_recv_timeout(client_fd.fd, client);
m_data_buffer.setSize(sizeof(m_data_storage));
Drv::Test::fill_random_buffer(m_data_buffer);
size = Drv::Test::fill_random_buffer(m_data_buffer);
Drv::SendStatus status = invoke_to_send(0, m_data_buffer);
EXPECT_EQ(status, SendStatus::SEND_OK);
U16 counter = 0;
while ((status2 == Drv::SOCK_NO_DATA_AVAILABLE) and counter < Drv::Test::MAX_ITER) {
status2 = client.recv(client_fd, buffer, size);
counter++;
}
EXPECT_EQ(status2, Drv::SOCK_SUCCESS);
EXPECT_EQ(size, m_data_buffer.getSize());
EXPECT_EQ(status, SendStatus::SEND_OK) <<
"On iteration: " << i << " and receive thread: " << recv_thread;
Drv::Test::receive_all(client, client_fd, buffer, size);
EXPECT_EQ(status2, Drv::SOCK_SUCCESS) <<
"On iteration: " << i << " and receive thread: " << recv_thread << " and errno " << errno;
EXPECT_EQ(size, m_data_buffer.getSize()) <<
"On iteration: " << i << " and receive thread: " << recv_thread;
Drv::Test::validate_random_buffer(m_data_buffer, buffer);
// If receive thread is live, try the other way
@ -88,24 +86,30 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) {
m_spinner = false;
m_data_buffer.setSize(sizeof(m_data_storage));
status2 = client.send(client_fd, m_data_buffer.getData(), m_data_buffer.getSize());
EXPECT_EQ(status2, Drv::SOCK_SUCCESS);
//from_deallocate_handler(0, m_data_buffer);
while (not m_spinner) {}
EXPECT_EQ(status2, Drv::SOCK_SUCCESS) <<
"On iteration: " << i << " and receive thread: " << recv_thread;
if (status2 == Drv::SOCK_SUCCESS) {
while (not m_spinner) {}
}
}
}
// Properly stop the client on the last iteration
if (((1 + i) == iterations) && recv_thread) {
this->component.stop();
this->component.close();
client.close(client_fd); // Client must be closed first or the server risks binding to an existing address
this->component.shutdown();
this->component.join();
} else {
// Server initiates shutdown. It thus must drain its data until it receives
// a socket disconnection. Then it can safely close.
this->component.shutdown();
Drv::Test::drain(this->component.m_socket, this->component.m_descriptor);
this->component.close();
client.close(client_fd); // Client must be closed first or the server risks binding to an existing address
}
// Server should have shutdown cleanly and waited for this to be shut down. It is safe
// to release the file descriptor.
client.close(client_fd);
}
this->component.terminate();
ASSERT_from_ready_SIZE(iterations);
}
@ -168,7 +172,7 @@ void TcpServerTester ::test_advanced_reconnect() {
void TcpServerTester ::from_recv_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& recvBuffer, const RecvStatus& recvStatus) {
// this function will still receive a status of error because the recv port is always called
this->pushFromPortEntry_recv(recvBuffer, recvStatus);
if (recvStatus == RecvStatus::RECV_OK){
if (recvStatus == RecvStatus::RECV_OK) {
// Make sure we can get to unblocking the spinner
EXPECT_EQ(m_data_buffer.getSize(), recvBuffer.getSize()) << "Invalid transmission size";
Drv::Test::validate_random_buffer(m_data_buffer, recvBuffer.getData());
@ -189,7 +193,6 @@ Fw::Buffer TcpServerTester ::
{
this->pushFromPortEntry_allocate(size);
Fw::Buffer buffer(new U8[size], size);
m_data_buffer2 = buffer;
return buffer;
}

View File

@ -132,7 +132,6 @@ namespace Drv {
//!
TcpServerComponentImpl component;
Fw::Buffer m_data_buffer;
Fw::Buffer m_data_buffer2;
U8 m_data_storage[SEND_DATA_BUFFER_SIZE];
std::atomic<bool> m_spinner;

View File

@ -23,23 +23,16 @@ namespace Drv {
// ----------------------------------------------------------------------
UdpComponentImpl::UdpComponentImpl(const char* const compName)
: UdpComponentBase(compName),
SocketComponentHelper() {}
: UdpComponentBase(compName) {}
SocketIpStatus UdpComponentImpl::configureSend(const char* hostname,
const U16 port,
const U32 send_timeout_seconds,
const U32 send_timeout_microseconds) {
if (not this->isStarted()) {
(void)this->startup();
}
return m_socket.configureSend(hostname, port, send_timeout_seconds, send_timeout_microseconds);
}
SocketIpStatus UdpComponentImpl::configureRecv(const char* hostname, const U16 port) {
if (not this->isStarted()) {
(void)this->startup();
}
return m_socket.configureRecv(hostname, port);
}

View File

@ -147,7 +147,6 @@ PROTECTED:
* \return SEND_OK on success, SEND_RETRY when critical data should be retried and SEND_ERROR upon error
*/
Drv::SendStatus send_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer);
Drv::UdpSocket m_socket; //!< Socket implementation
};

View File

@ -4,22 +4,22 @@
#include "UdpTester.hpp"
TEST(Nominal, BasicMessaging) {
TEST(Nominal, UdpBasicMessaging) {
Drv::UdpTester tester;
tester.test_basic_messaging();
}
TEST(Nominal, BasicReceiveThread) {
TEST(Nominal, UdpBasicReceiveThread) {
Drv::UdpTester tester;
tester.test_receive_thread();
}
TEST(Reconnect, MultiMessaging) {
TEST(Reconnect, UdpMultiMessaging) {
Drv::UdpTester tester;
tester.test_multiple_messaging();
}
TEST(Reconnect, ReceiveThreadReconnect) {
TEST(Reconnect, UdpReceiveThreadReconnect) {
Drv::UdpTester tester;
tester.test_advanced_reconnect();
}

View File

@ -28,7 +28,7 @@ void UdpTester::test_with_loop(U32 iterations, bool recv_thread) {
U8 buffer[sizeof(m_data_storage)] = {};
Drv::SocketIpStatus status1 = Drv::SOCK_SUCCESS;
Drv::SocketIpStatus status2 = Drv::SOCK_SUCCESS;
NATIVE_INT_TYPE udp2_fd = -1;
Drv::SocketDescriptor udp2_fd;
U16 port1 = Drv::Test::get_free_port(true);
ASSERT_NE(0, port1);
@ -83,25 +83,17 @@ void UdpTester::test_with_loop(U32 iterations, bool recv_thread) {
<< "Port1: " << port1 << std::endl
<< "Port2: " << port2 << std::endl;
status2 = Drv::SOCK_NO_DATA_AVAILABLE;
// If all the opens worked, then run this
if ((Drv::SOCK_SUCCESS == status1) && (Drv::SOCK_SUCCESS == status2) &&
(this->component.isOpened())) {
// Force the sockets not to hang, if at all possible
Drv::Test::force_recv_timeout(this->component.m_fd, this->component.getSocketHandler());
Drv::Test::force_recv_timeout(udp2_fd, udp2);
Drv::Test::force_recv_timeout(this->component.m_descriptor.fd, this->component.getSocketHandler());
Drv::Test::force_recv_timeout(udp2_fd.fd, udp2);
m_data_buffer.setSize(sizeof(m_data_storage));
Drv::Test::fill_random_buffer(m_data_buffer);
size = Drv::Test::fill_random_buffer(m_data_buffer);
Drv::SendStatus status = invoke_to_send(0, m_data_buffer);
EXPECT_EQ(status, SendStatus::SEND_OK);
U16 counter = 0;
while ((status2 == Drv::SOCK_NO_DATA_AVAILABLE) and counter < Drv::Test::MAX_ITER) {
status2 = udp2.recv(udp2_fd, buffer, size);
counter++;
}
EXPECT_EQ(status2, Drv::SOCK_SUCCESS);
EXPECT_EQ(size, m_data_buffer.getSize());
Drv::Test::receive_all(udp2, udp2_fd, buffer, size);
Drv::Test::validate_random_buffer(m_data_buffer, buffer);
// If receive thread is live, try the other way
if (recv_thread) {