Strip ANSI escape color sequences from /latest log responses (#6319)

* Strip ANSI escape color sequences from /latest log responses

Strip ANSI sequences of CSI commands [1] used for log coloring from
/latest log endpoints. These endpoint were primarily designed for log
downloads and colors are mostly not wanted in those. Add optional
argument for stripping the colors from the logs and enable it for the
/latest endpoints.

[1] https://en.wikipedia.org/wiki/ANSI_escape_code#CSIsection

* Refactor advanced logs' tests to use fixture factory

Introduce `advanced_logs_tester` fixture to simplify testing of advanced logs
in the API tests, declaring all the needed fixture in a single place. # Please
enter the commit message for your changes. Lines starting
This commit is contained in:
Jan Čermák 2025-11-19 09:39:24 +01:00 committed by GitHub
parent d3d652eba5
commit 0837e05cb2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 301 additions and 191 deletions

View File

@ -152,6 +152,7 @@ class RestAPI(CoreSysAttributes):
self._api_host.advanced_logs,
identifier=syslog_identifier,
latest=True,
no_colors=True,
),
),
web.get(
@ -449,6 +450,7 @@ class RestAPI(CoreSysAttributes):
await async_capture_exception(err)
kwargs.pop("follow", None) # Follow is not supported for Docker logs
kwargs.pop("latest", None) # Latest is not supported for Docker logs
kwargs.pop("no_colors", None) # no_colors not supported for Docker logs
return await api_supervisor.logs(*args, **kwargs)
self.webapp.add_routes(
@ -460,7 +462,7 @@ class RestAPI(CoreSysAttributes):
),
web.get(
"/supervisor/logs/latest",
partial(get_supervisor_logs, latest=True),
partial(get_supervisor_logs, latest=True, no_colors=True),
),
web.get("/supervisor/logs/boots/{bootid}", get_supervisor_logs),
web.get(
@ -576,7 +578,7 @@ class RestAPI(CoreSysAttributes):
),
web.get(
"/addons/{addon}/logs/latest",
partial(get_addon_logs, latest=True),
partial(get_addon_logs, latest=True, no_colors=True),
),
web.get("/addons/{addon}/logs/boots/{bootid}", get_addon_logs),
web.get(

View File

@ -206,6 +206,7 @@ class APIHost(CoreSysAttributes):
identifier: str | None = None,
follow: bool = False,
latest: bool = False,
no_colors: bool = False,
) -> web.StreamResponse:
"""Return systemd-journald logs."""
log_formatter = LogFormatter.PLAIN
@ -280,7 +281,9 @@ class APIHost(CoreSysAttributes):
response = web.StreamResponse()
response.content_type = CONTENT_TYPE_TEXT
headers_returned = False
async for cursor, line in journal_logs_reader(resp, log_formatter):
async for cursor, line in journal_logs_reader(
resp, log_formatter, no_colors
):
try:
if not headers_returned:
if cursor:
@ -318,9 +321,12 @@ class APIHost(CoreSysAttributes):
identifier: str | None = None,
follow: bool = False,
latest: bool = False,
no_colors: bool = False,
) -> web.StreamResponse:
"""Return systemd-journald logs. Wrapped as standard API handler."""
return await self.advanced_logs_handler(request, identifier, follow, latest)
return await self.advanced_logs_handler(
request, identifier, follow, latest, no_colors
)
@api_process
async def disk_usage(self, request: web.Request) -> dict:

View File

@ -5,12 +5,20 @@ from collections.abc import AsyncGenerator
from datetime import UTC, datetime
from functools import wraps
import json
import re
from aiohttp import ClientResponse
from supervisor.exceptions import MalformedBinaryEntryError
from supervisor.host.const import LogFormatter
_RE_ANSI_CSI_COLORS_PATTERN = re.compile(r"\x1B\[[0-9;]*m")
def _strip_ansi_colors(message: str) -> str:
"""Remove ANSI color codes from a message string."""
return _RE_ANSI_CSI_COLORS_PATTERN.sub("", message)
def formatter(required_fields: list[str]):
"""Decorate journal entry formatters with list of required fields.
@ -31,9 +39,9 @@ def formatter(required_fields: list[str]):
@formatter(["MESSAGE"])
def journal_plain_formatter(entries: dict[str, str]) -> str:
def journal_plain_formatter(entries: dict[str, str], no_colors: bool = False) -> str:
"""Format parsed journal entries as a plain message."""
return entries["MESSAGE"]
return _strip_ansi_colors(entries["MESSAGE"]) if no_colors else entries["MESSAGE"]
@formatter(
@ -45,7 +53,7 @@ def journal_plain_formatter(entries: dict[str, str]) -> str:
"MESSAGE",
]
)
def journal_verbose_formatter(entries: dict[str, str]) -> str:
def journal_verbose_formatter(entries: dict[str, str], no_colors: bool = False) -> str:
"""Format parsed journal entries to a journalctl-like format."""
ts = datetime.fromtimestamp(
int(entries["__REALTIME_TIMESTAMP"]) / 1e6, UTC
@ -58,14 +66,24 @@ def journal_verbose_formatter(entries: dict[str, str]) -> str:
else entries.get("SYSLOG_IDENTIFIER", "_UNKNOWN_")
)
return f"{ts} {entries.get('_HOSTNAME', '')} {identifier}: {entries.get('MESSAGE', '')}"
message = (
_strip_ansi_colors(entries.get("MESSAGE", ""))
if no_colors
else entries.get("MESSAGE", "")
)
return f"{ts} {entries.get('_HOSTNAME', '')} {identifier}: {message}"
async def journal_logs_reader(
journal_logs: ClientResponse, log_formatter: LogFormatter = LogFormatter.PLAIN
journal_logs: ClientResponse,
log_formatter: LogFormatter = LogFormatter.PLAIN,
no_colors: bool = False,
) -> AsyncGenerator[tuple[str | None, str]]:
"""Read logs from systemd journal line by line, formatted using the given formatter.
Optionally strip ANSI color codes from the entries' messages.
Returns a generator of (cursor, formatted_entry) tuples.
"""
match log_formatter:
@ -84,7 +102,10 @@ async def journal_logs_reader(
# at EOF (likely race between at_eof and EOF check in readuntil)
if line == b"\n" or not line:
if entries:
yield entries.get("__CURSOR"), formatter_(entries)
yield (
entries.get("__CURSOR"),
formatter_(entries, no_colors=no_colors),
)
entries = {}
continue

View File

@ -1,95 +1 @@
"""Test for API calls."""
from unittest.mock import AsyncMock, MagicMock
from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from supervisor.host.const import LogFormat
DEFAULT_LOG_RANGE = "entries=:-99:100"
DEFAULT_LOG_RANGE_FOLLOW = "entries=:-99:18446744073709551615"
async def common_test_api_advanced_logs(
path_prefix: str,
syslog_identifier: str,
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available: None,
):
"""Template for tests of endpoints using advanced logs."""
resp = await api_client.get(f"{path_prefix}/logs")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier},
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/follow")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier, "follow": ""},
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
mock_response = MagicMock()
mock_response.text = AsyncMock(
return_value='{"CONTAINER_LOG_EPOCH": "12345"}\n{"CONTAINER_LOG_EPOCH": "12345"}\n'
)
journald_logs.return_value.__aenter__.return_value = mock_response
resp = await api_client.get(f"{path_prefix}/logs/latest")
assert resp.status == 200
assert journald_logs.call_count == 2
# Check the first call for getting epoch
epoch_call = journald_logs.call_args_list[0]
assert epoch_call[1]["params"] == {"CONTAINER_NAME": syslog_identifier}
assert epoch_call[1]["range_header"] == "entries=:-1:2"
# Check the second call for getting logs with the epoch
logs_call = journald_logs.call_args_list[1]
assert logs_call[1]["params"]["SYSLOG_IDENTIFIER"] == syslog_identifier
assert logs_call[1]["params"]["CONTAINER_LOG_EPOCH"] == "12345"
assert logs_call[1]["range_header"] == "entries=:0:18446744073709551615"
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/boots/0")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier, "_BOOT_ID": "ccc"},
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/boots/0/follow")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={
"SYSLOG_IDENTIFIER": syslog_identifier,
"_BOOT_ID": "ccc",
"follow": "",
},
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)

133
tests/api/conftest.py Normal file
View File

@ -0,0 +1,133 @@
"""Fixtures for API tests."""
from collections.abc import Awaitable, Callable
from unittest.mock import ANY, AsyncMock, MagicMock
from aiohttp.test_utils import TestClient
import pytest
from supervisor.coresys import CoreSys
from supervisor.host.const import LogFormat, LogFormatter
DEFAULT_LOG_RANGE = "entries=:-99:100"
DEFAULT_LOG_RANGE_FOLLOW = "entries=:-99:18446744073709551615"
async def _common_test_api_advanced_logs(
path_prefix: str,
syslog_identifier: str,
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available: None,
journal_logs_reader: MagicMock,
):
"""Template for tests of endpoints using advanced logs."""
resp = await api_client.get(f"{path_prefix}/logs")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier},
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/follow")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier, "follow": ""},
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.PLAIN, False)
journald_logs.reset_mock()
journal_logs_reader.reset_mock()
mock_response = MagicMock()
mock_response.text = AsyncMock(
return_value='{"CONTAINER_LOG_EPOCH": "12345"}\n{"CONTAINER_LOG_EPOCH": "12345"}\n'
)
journald_logs.return_value.__aenter__.return_value = mock_response
resp = await api_client.get(f"{path_prefix}/logs/latest")
assert resp.status == 200
assert journald_logs.call_count == 2
# Check the first call for getting epoch
epoch_call = journald_logs.call_args_list[0]
assert epoch_call[1]["params"] == {"CONTAINER_NAME": syslog_identifier}
assert epoch_call[1]["range_header"] == "entries=:-1:2"
# Check the second call for getting logs with the epoch
logs_call = journald_logs.call_args_list[1]
assert logs_call[1]["params"]["SYSLOG_IDENTIFIER"] == syslog_identifier
assert logs_call[1]["params"]["CONTAINER_LOG_EPOCH"] == "12345"
assert logs_call[1]["range_header"] == "entries=:0:18446744073709551615"
journal_logs_reader.assert_called_with(ANY, LogFormatter.PLAIN, True)
journald_logs.reset_mock()
journal_logs_reader.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/boots/0")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier, "_BOOT_ID": "ccc"},
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/boots/0/follow")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={
"SYSLOG_IDENTIFIER": syslog_identifier,
"_BOOT_ID": "ccc",
"follow": "",
},
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)
@pytest.fixture
async def advanced_logs_tester(
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available,
journal_logs_reader: MagicMock,
) -> Callable[[str, str], Awaitable[None]]:
"""Fixture that returns a function to test advanced logs endpoints.
This allows tests to avoid explicitly passing all the required fixtures.
Usage:
async def test_my_logs(advanced_logs_tester):
await advanced_logs_tester("/path/prefix", "syslog_identifier")
"""
async def test_logs(path_prefix: str, syslog_identifier: str):
await _common_test_api_advanced_logs(
path_prefix,
syslog_identifier,
api_client,
journald_logs,
coresys,
os_available,
journal_logs_reader,
)
return test_logs

View File

@ -20,7 +20,6 @@ from supervisor.exceptions import HassioError
from supervisor.store.repository import Repository
from ..const import TEST_ADDON_SLUG
from . import common_test_api_advanced_logs
def _create_test_event(name: str, state: ContainerState) -> DockerContainerStateEvent:
@ -72,21 +71,11 @@ async def test_addons_info_not_installed(
async def test_api_addon_logs(
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available,
advanced_logs_tester,
install_addon_ssh: Addon,
):
"""Test addon logs."""
await common_test_api_advanced_logs(
"/addons/local_ssh",
"addon_local_ssh",
api_client,
journald_logs,
coresys,
os_available,
)
await advanced_logs_tester("/addons/local_ssh", "addon_local_ssh")
async def test_api_addon_logs_not_installed(api_client: TestClient):

View File

@ -1,18 +1,6 @@
"""Test audio api."""
from unittest.mock import MagicMock
from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from tests.api import common_test_api_advanced_logs
async def test_api_audio_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
async def test_api_audio_logs(advanced_logs_tester) -> None:
"""Test audio logs."""
await common_test_api_advanced_logs(
"/audio", "hassio_audio", api_client, journald_logs, coresys, os_available
)
await advanced_logs_tester("/audio", "hassio_audio")

View File

@ -1,13 +1,12 @@
"""Test DNS API."""
from unittest.mock import MagicMock, patch
from unittest.mock import patch
from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from supervisor.dbus.resolved import Resolved
from tests.api import common_test_api_advanced_logs
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.resolved import Resolved as ResolvedService
@ -66,15 +65,6 @@ async def test_options(api_client: TestClient, coresys: CoreSys):
restart.assert_called_once()
async def test_api_dns_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
async def test_api_dns_logs(advanced_logs_tester):
"""Test dns logs."""
await common_test_api_advanced_logs(
"/dns",
"hassio_dns",
api_client,
journald_logs,
coresys,
os_available,
)
await advanced_logs_tester("/dns", "hassio_dns")

View File

@ -18,26 +18,18 @@ from supervisor.homeassistant.const import WSEvent
from supervisor.homeassistant.core import HomeAssistantCore
from supervisor.homeassistant.module import HomeAssistant
from tests.api import common_test_api_advanced_logs
from tests.common import AsyncIterator, load_json_fixture
@pytest.mark.parametrize("legacy_route", [True, False])
async def test_api_core_logs(
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available,
advanced_logs_tester: AsyncMock,
legacy_route: bool,
):
"""Test core logs."""
await common_test_api_advanced_logs(
await advanced_logs_tester(
f"/{'homeassistant' if legacy_route else 'core'}",
"homeassistant",
api_client,
journald_logs,
coresys,
os_available,
)

View File

@ -272,7 +272,7 @@ async def test_advaced_logs_query_parameters(
range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.assert_called_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.reset_mock()
journald_logs.reset_mock()
@ -290,7 +290,7 @@ async def test_advaced_logs_query_parameters(
range_header="entries=:-52:53",
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.assert_called_with(ANY, LogFormatter.VERBOSE, False)
async def test_advanced_logs_boot_id_offset(
@ -343,24 +343,24 @@ async def test_advanced_logs_formatters(
"""Test advanced logs formatters varying on Accept header."""
await api_client.get("/host/logs")
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.reset_mock()
headers = {"Accept": "text/x-log"}
await api_client.get("/host/logs", headers=headers)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.reset_mock()
await api_client.get("/host/logs/identifiers/test")
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.PLAIN)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.PLAIN, False)
journal_logs_reader.reset_mock()
headers = {"Accept": "text/x-log"}
await api_client.get("/host/logs/identifiers/test", headers=headers)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE, False)
async def test_advanced_logs_errors(coresys: CoreSys, api_client: TestClient):

View File

@ -1,23 +1,6 @@
"""Test multicast api."""
from unittest.mock import MagicMock
from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from tests.api import common_test_api_advanced_logs
async def test_api_multicast_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
async def test_api_multicast_logs(advanced_logs_tester):
"""Test multicast logs."""
await common_test_api_advanced_logs(
"/multicast",
"hassio_multicast",
api_client,
journald_logs,
coresys,
os_available,
)
await advanced_logs_tester("/multicast", "hassio_multicast")

View File

@ -18,7 +18,6 @@ from supervisor.store.repository import Repository
from supervisor.supervisor import Supervisor
from supervisor.updater import Updater
from tests.api import common_test_api_advanced_logs
from tests.common import AsyncIterator, load_json_fixture
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.os_agent import OSAgent as OSAgentService
@ -155,18 +154,9 @@ async def test_api_supervisor_options_diagnostics(
assert coresys.dbus.agent.diagnostics is False
async def test_api_supervisor_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
async def test_api_supervisor_logs(advanced_logs_tester):
"""Test supervisor logs."""
await common_test_api_advanced_logs(
"/supervisor",
"hassio_supervisor",
api_client,
journald_logs,
coresys,
os_available,
)
await advanced_logs_tester("/supervisor", "hassio_supervisor")
async def test_api_supervisor_fallback(

View File

@ -90,6 +90,49 @@ async def test_logs_coloured(journald_gateway: MagicMock, coresys: CoreSys):
)
async def test_logs_no_colors(journald_gateway: MagicMock, coresys: CoreSys):
"""Test ANSI color codes being stripped when no_colors=True."""
journald_gateway.content.feed_data(
load_fixture("logs_export_supervisor.txt").encode("utf-8")
)
journald_gateway.content.feed_eof()
async with coresys.host.logs.journald_logs() as resp:
cursor, line = await anext(journal_logs_reader(resp, no_colors=True))
assert (
cursor
== "s=83fee99ca0c3466db5fc120d52ca7dd8;i=2049389;b=f5a5c442fa6548cf97474d2d57c920b3;m=4263828e8c;t=612dda478b01b;x=9ae12394c9326930"
)
# Colors should be stripped
assert (
line == "24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor"
)
async def test_logs_verbose_no_colors(journald_gateway: MagicMock, coresys: CoreSys):
"""Test ANSI color codes being stripped from verbose formatted logs when no_colors=True."""
journald_gateway.content.feed_data(
load_fixture("logs_export_supervisor.txt").encode("utf-8")
)
journald_gateway.content.feed_eof()
async with coresys.host.logs.journald_logs() as resp:
cursor, line = await anext(
journal_logs_reader(
resp, log_formatter=LogFormatter.VERBOSE, no_colors=True
)
)
assert (
cursor
== "s=83fee99ca0c3466db5fc120d52ca7dd8;i=2049389;b=f5a5c442fa6548cf97474d2d57c920b3;m=4263828e8c;t=612dda478b01b;x=9ae12394c9326930"
)
# Colors should be stripped in verbose format too
assert (
line
== "2024-03-04 22:56:56.709 ha-hloub hassio_supervisor[466]: 24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor"
)
async def test_boot_ids(
journald_gateway: MagicMock,
coresys: CoreSys,

View File

@ -86,6 +86,22 @@ def test_format_verbose_newlines():
)
def test_format_verbose_colors():
"""Test verbose formatter with ANSI colors in message."""
fields = {
"__REALTIME_TIMESTAMP": "1379403171000000",
"_HOSTNAME": "homeassistant",
"SYSLOG_IDENTIFIER": "python",
"_PID": "666",
"MESSAGE": "\x1b[32mHello, world!\x1b[0m",
}
assert (
journal_verbose_formatter(fields)
== "2013-09-17 07:32:51.000 homeassistant python[666]: \x1b[32mHello, world!\x1b[0m"
)
async def test_parsing_simple():
"""Test plain formatter."""
journal_logs, stream = _journal_logs_mock()
@ -297,3 +313,54 @@ async def test_parsing_non_utf8_in_binary_message():
)
_, line = await anext(journal_logs_reader(journal_logs))
assert line == "Hello, \ufffd world!"
def test_format_plain_no_colors():
"""Test plain formatter strips ANSI color codes when no_colors=True."""
fields = {"MESSAGE": "\x1b[32mHello, world!\x1b[0m"}
assert journal_plain_formatter(fields, no_colors=True) == "Hello, world!"
def test_format_verbose_no_colors():
"""Test verbose formatter strips ANSI color codes when no_colors=True."""
fields = {
"__REALTIME_TIMESTAMP": "1379403171000000",
"_HOSTNAME": "homeassistant",
"SYSLOG_IDENTIFIER": "python",
"_PID": "666",
"MESSAGE": "\x1b[32mHello, world!\x1b[0m",
}
assert (
journal_verbose_formatter(fields, no_colors=True)
== "2013-09-17 07:32:51.000 homeassistant python[666]: Hello, world!"
)
async def test_parsing_colored_logs_verbose_no_colors():
"""Test verbose formatter strips colors from colored logs."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"__REALTIME_TIMESTAMP=1379403171000000\n"
b"_HOSTNAME=homeassistant\n"
b"SYSLOG_IDENTIFIER=python\n"
b"_PID=666\n"
b"MESSAGE\n\x0e\x00\x00\x00\x00\x00\x00\x00\x1b[31mERROR\x1b[0m\n"
b"AFTER=after\n\n"
)
_, line = await anext(
journal_logs_reader(
journal_logs, log_formatter=LogFormatter.VERBOSE, no_colors=True
)
)
assert line == "2013-09-17 07:32:51.000 homeassistant python[666]: ERROR"
async def test_parsing_multiple_color_codes():
"""Test stripping multiple ANSI color codes in single message."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"MESSAGE\n\x29\x00\x00\x00\x00\x00\x00\x00\x1b[31mRed\x1b[0m \x1b[32mGreen\x1b[0m \x1b[34mBlue\x1b[0m\n"
b"AFTER=after\n\n"
)
_, line = await anext(journal_logs_reader(journal_logs, no_colors=True))
assert line == "Red Green Blue"