Implement authentication in CTI Console (#666)

* Transfer files

* Add draft

* Add getResourceToken() methods

Adds jackson lib

* Implement service and request to obtain plans from CTI Console

* Add wazuh-tag header to request to obtains plans

* Generalize CTI Console services via abstract class

* Add periodic task to obtain a token

* Fix forbidden APIs errors

* Comment out test code

* Add unit tests

* Add wait/notify to onTokenChanged

Add unit test

* Replace notify/await with lock/unlock

* Remove test annotation

* Apply suggestions from code review for thread-safe operations

* Fix merging issues

---------

Signed-off-by: Álex Ruiz Becerra <alejandro.ruiz.becerra@wazuh.com>
This commit is contained in:
Álex Ruiz Becerra 2025-11-24 17:09:15 +01:00 committed by GitHub
parent fc224c5724
commit ffab688f14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1204 additions and 40 deletions

View File

@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add Security Compliance fields to the WCS [(#643)](https://github.com/wazuh/wazuh-indexer-plugins/pull/643)
- Initialize indexer content manager [(#651)](https://github.com/wazuh/wazuh-indexer-plugins/pull/651)
- Implement Imposter mock server for CTI API [#661](https://github.com/wazuh/wazuh-indexer-plugins/pull/661)
- Implement authentication in CTI Console [#666](https://github.com/wazuh/wazuh-indexer-plugins/pull/666)
- Initialize consumers metadata index on start [(#668)](https://github.com/wazuh/wazuh-indexer-plugins/pull/668)
### Dependencies

View File

@ -79,11 +79,19 @@ configurations {
}
dependencies {
// log4j
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.25.2"
implementation "org.slf4j:slf4j-api:2.0.17"
// Http Client
implementation "org.apache.httpcomponents.client5:httpclient5:5.4.4"
implementation "org.apache.httpcomponents.core5:httpcore5-h2:5.3.4"
implementation "org.apache.httpcomponents.core5:httpcore5:5.3.4"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.25.2"
implementation "org.slf4j:slf4j-api:2.0.17"
// Use Jackson provided by OpenSearch (2.18.2) - only for compile time
compileOnly 'com.fasterxml.jackson.core:jackson-core:2.18.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.2'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.18.2'
// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: "${opensearch_build}"

View File

@ -28,7 +28,7 @@ else if (!resource) {
.withContent('{"error": "invalid_request", "error_description": "Missing required parameter resource"}')
}
// Check for invalid resource endpoint
else if (resource && !resource.contains("localhost:8443")) {
else if (resource && resource.contains("invalid_target")) {
respond()
.withStatusCode(400)
.withHeader("Cache-Control", "no-store")

View File

@ -16,13 +16,18 @@
*/
package com.wazuh.contentmanager;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import com.wazuh.contentmanager.utils.SnapshotManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.transport.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.*;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
@ -32,18 +37,16 @@ import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.watcher.ResourceWatcherService;
import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import com.wazuh.contentmanager.utils.SnapshotManager;
/**
* Main class of the Content Manager Plugin
*/
@ -59,6 +62,7 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin {
private SnapshotManager snapshotManager;
private ThreadPool threadPool;
private ClusterService clusterService;
private CtiConsole ctiConsole;
@Override
public Collection<Object> createComponents(
@ -79,7 +83,8 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin {
this.consumersIndex = new ConsumersIndex(client);
this.contentIndex = new ContentIndex(client);
this.snapshotManager =
new SnapshotManager(environment, this.consumersIndex, this.contentIndex, new Privileged());
new SnapshotManager(environment, this.consumersIndex, this.contentIndex, new Privileged());
// this.ctiConsole = new CtiConsole(new AuthServiceImpl());
return Collections.emptyList();
}
@ -97,6 +102,40 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin {
log.info("Starting Content Manager plugin initialization");
this.start();
}
/*
// Use case 1. Polling
AuthServiceImpl authService = new AuthServiceImpl();
this.ctiConsole = new CtiConsole();
this.ctiConsole.setAuthService(authService);
this.ctiConsole.onPostSubscriptionRequest();
while (!this.ctiConsole.isTokenTaskCompleted()) {}
if (this.ctiConsole.isTokenTaskCompleted()) {
Token token = this.ctiConsole.getToken();
// Use case 2. Obtain available plans
PlansServiceImpl productsService = new PlansServiceImpl();
List<Plan> plans = productsService.getPlans(token.getAccessToken());
log.info("Plans: {}", plans);
// Use case 3. Obtain resource token.
Product vulnsPro = plans.stream()
.filter(plan -> plan.getName().equals("Pro Plan Deluxe"))
.toList()
.getFirst()
.getProducts().stream()
.filter(product -> product.getIdentifier().equals("vulnerabilities-pro"))
.toList()
.getFirst();
Token resourceToken = authService.getResourceToken(
token.getAccessToken(),
vulnsPro.getResource()
);
log.info("Resource token {}", resourceToken);
}
*/
}
/**

View File

@ -27,6 +27,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.env.Environment;
import org.opensearch.action.ActionType;
import org.opensearch.action.ActionRequest;
import org.opensearch.core.action.ActionResponse;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;

View File

@ -58,10 +58,14 @@ public class HttpClient {
private static final Object LOCK = new Object();
/** Singleton instance of the HTTP client. */
/**
* Singleton instance of the HTTP client.
*/
protected static CloseableHttpAsyncClient httpClient;
/** Base URI for API requests */
/**
* Base URI for API requests
*/
protected final URI apiUri;
/**
@ -86,18 +90,18 @@ public class HttpClient {
if (httpClient == null) {
try {
SSLContext sslContext =
SSLContextBuilder.create()
.loadTrustMaterial(null, (chains, authType) -> true)
.build();
SSLContextBuilder.create()
.loadTrustMaterial(null, (chains, authType) -> true)
.build();
httpClient =
HttpAsyncClients.custom()
.setConnectionManager(
PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(
ClientTlsStrategyBuilder.create().setSslContext(sslContext).build())
.build())
.build();
HttpAsyncClients.custom()
.setConnectionManager(
PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(
ClientTlsStrategyBuilder.create().setSslContext(sslContext).build())
.build())
.build();
httpClient.start();
} catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
log.error("Error initializing HTTP client: {}", e.getMessage());
@ -110,19 +114,19 @@ public class HttpClient {
/**
* Sends an HTTP request with the specified parameters.
*
* @param method The HTTP method (e.g., GET, POST, PUT, DELETE).
* @param endpoint The endpoint to append to the base API URI.
* @param requestBody The request body (optional, applicable for POST/PUT).
* @param method The HTTP method (e.g., GET, POST, PUT, DELETE).
* @param endpoint The endpoint to append to the base API URI.
* @param requestBody The request body (optional, applicable for POST/PUT).
* @param queryParameters The query parameters (optional).
* @param headers The headers to include in the request (optional).
* @param headers The headers to include in the request (optional).
* @return A SimpleHttpResponse containing the response details.
*/
protected SimpleHttpResponse sendRequest(
@NonNull Method method,
String endpoint,
String requestBody,
Map<String, String> queryParameters,
Header... headers) {
@NonNull Method method,
String endpoint,
String requestBody,
Map<String, String> queryParameters,
Header... headers) {
URI _apiUri;
if (httpClient == null) {
startHttpAsyncClient();
@ -151,12 +155,12 @@ public class HttpClient {
SimpleHttpRequest request = builder.setHttpHost(httpHost).setPath(_apiUri.getPath()).build();
log.debug("Request sent: [{}]", request);
return httpClient
.execute(
SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(),
new HttpResponseCallback(
request, "Failed to execute outgoing " + method + " request"))
.get(PluginSettings.getInstance().getClientTimeout(), TimeUnit.SECONDS);
.execute(
SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(),
new HttpResponseCallback(
request, "Failed to execute outgoing " + method + " request"))
.get(PluginSettings.getInstance().getClientTimeout(), TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("HTTP {} request failed: {}", method, e.getMessage());
Thread.currentThread().interrupt();

View File

@ -0,0 +1,187 @@
package com.wazuh.contentmanager.cti.console;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.cti.console.service.AuthService;
import com.wazuh.contentmanager.cti.console.service.PlansService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.util.concurrent.FutureUtils;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* CTI Console main class. Contains and manages CTI Console internal state and services.
*/
public class CtiConsole implements TokenListener {
private static final Logger log = LogManager.getLogger(CtiConsole.class);
private static final String TASK_NAME = "CTI Console Periodic Task";
/**
* CTI Console authentication service.
*/
private AuthService authService;
/**
* CTI Console plans service.
*/
private PlansService plansService;
/**
* Permanent token of this instance to authenticate to the CTI Console.
*/
private volatile Token token;
/**
* Used to cancel the periodic task to obtain a token when completed or expired.
*/
private ScheduledFuture<?> getTokenTaskFuture;
/**
* Lock for synchronizing token retrieval.
*/
private final Lock tokenLock = new ReentrantLock();
/**
* Condition to signal when token is obtained.
*/
private final Condition tokenAvailable = tokenLock.newCondition();
/**
* Thread executor.
*/
private final ScheduledExecutorService executor;
/**
* Default constructor.
*/
public CtiConsole() {
this.executor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, TASK_NAME));
}
/**
* Sets the plan service for this CTI Console instance.
* @param plansService plans service implementation.
*/
public void setPlansService(PlansService plansService) {
// Gracefully close existing http client
if (this.plansService != null) {
this.plansService.close();
}
this.plansService = plansService;
}
/**
* Sets the authentication service for this CTI Console instance.
* @param authService authentication service implementation.
*/
public void setAuthService(AuthService authService) {
// Gracefully close existing http client
if (this.authService != null) {
this.authService.close();
}
this.authService = authService;
// Pass the instance as a listener for token changes.
this.authService.addListener(this);
}
@Override
public void onTokenChanged(Token t) {
tokenLock.lock();
try {
this.token = t;
log.info("Permanent token changed: {}", this.token); // TODO do not log the token
// Cancel polling
FutureUtils.cancel(this.getTokenTaskFuture);
this.executor.shutdown();
// Signal all waiting threads that the token has been obtained
tokenAvailable.signalAll();
} finally {
tokenLock.unlock();
}
}
/**
* Starts a periodic task to obtain a permanent token from the CTI Console.
* @param interval the period between successive executions.
*/
private void getToken(int interval/* TODO sub details */) {
Runnable getTokenTask = () -> this.authService.getToken("client_id", "polling");
this.getTokenTaskFuture = this.executor.scheduleAtFixedRate(getTokenTask, interval, interval, TimeUnit.SECONDS);
}
/**
* Triggers the mechanism to obtain a permanent token from the CTI Console.
* This method is meant to be called by the Rest handler.
*/
public void onPostSubscriptionRequest (/* TODO sub details */) {
this.getToken(5);
}
/**
* CTI Console token getter.
* @return permanent token.
*/
public Token getToken() {
return this.token;
}
/**
* Returns whether the periodic task to obtain a token has finished. See {@link ScheduledFuture#isDone()}.
* @return true if the task is done, false otherwise.
*/
public boolean isTokenTaskCompleted() {
return this.getTokenTaskFuture.isDone();
}
/**
* Waits for the token to be obtained from the CTI Console with a timeout.
* This method blocks the calling thread until either:
* - A token is successfully obtained (returns the token)
* - The timeout expires (returns null)
* - The thread is interrupted (throws InterruptedException)
*
* @param timeoutMillis maximum time to wait in milliseconds.
* @return the obtained token, or null if timeout expires or token is not obtained.
* @throws InterruptedException if the waiting thread is interrupted.
*/
public Token waitForToken(long timeoutMillis) throws InterruptedException {
tokenLock.lock();
try {
long remainingNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
// Wait until token is obtained or timeout expires
while (this.token == null && remainingNanos > 0) {
remainingNanos = tokenAvailable.awaitNanos(remainingNanos);
}
return this.token;
} finally {
tokenLock.unlock();
}
}
/**
* Waits indefinitely for the token to be obtained from the CTI Console.
* This method blocks the calling thread until a token is successfully obtained or interrupted.
*
* @return the obtained token.
* @throws InterruptedException if the waiting thread is interrupted.
*/
public Token waitForToken() throws InterruptedException {
tokenLock.lock();
try {
while (this.token == null) {
tokenAvailable.await();
}
return this.token;
} finally {
tokenLock.unlock();
}
}
}

View File

@ -0,0 +1,10 @@
package com.wazuh.contentmanager.cti.console;
import com.wazuh.contentmanager.cti.console.model.Token;
import java.util.EventListener;
public interface TokenListener extends EventListener {
void onTokenChanged(Token t);
}

View File

@ -0,0 +1,174 @@
package com.wazuh.contentmanager.cti.console.client;
import com.wazuh.contentmanager.utils.http.HttpResponseCallback;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.util.Timeout;
import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* CTI Console API client.
*/
public class ApiClient {
private static final String BASE_URI = "https://localhost:8443";
private static final String API_PREFIX = "/api/v1";
private static final String TOKEN_URI = BASE_URI + API_PREFIX + "/instances/token";
private static final String PRODUCTS_URI = BASE_URI + API_PREFIX + "/instances/me";
private static final String RESOURCE_URI = BASE_URI + API_PREFIX + "/instances/token/exchange";
private CloseableHttpAsyncClient client;
private final int TIMEOUT = 5;
/**
* Constructs an CtiApiClient instance.
*/
public ApiClient() {
this.buildClient();
}
/**
* Builds and starts the Http client.
*/
private void buildClient() {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(Timeout.ofSeconds(TIMEOUT))
.build();
SSLContext sslContext;
try {
sslContext =
SSLContextBuilder.create()
.loadTrustMaterial(null, (chains, authType) -> true)
.build();
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
throw new RuntimeException("Failed to initialize HttpClient", e);
}
this.client = HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.setConnectionManager(
PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(
ClientTlsStrategyBuilder.create().setSslContext(sslContext).build())
.build())
.build();
this.client.start();
}
/**
* Closes the underlying HTTP asynchronous client. Used in tests
*/
public void close() {
this.client.close(CloseMode.GRACEFUL);
}
/**
* Perform an HTTP POST request to the CTI Console to obtain a permanent token for this XDR/SIEM Wazuh instance
* @param clientId unique client identifier for the instance.
* @param deviceCode unique device code provided by the CTI Console during the registration of the instance.
* @return HTTP response.
* @throws ExecutionException request failed.
* @throws InterruptedException request failed / interrupted.
* @throws TimeoutException request timed out.
*/
public SimpleHttpResponse getToken(String clientId, String deviceCode) throws ExecutionException, InterruptedException, TimeoutException {
String grantType = "grant_type=urn:ietf:params:oauth:grant-type:device_code";
String formBody = String.format(Locale.ROOT, "%s&client_id=%s&device_code=%s", grantType, clientId, deviceCode);
SimpleHttpRequest request = SimpleRequestBuilder
.post(TOKEN_URI)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.toString())
.setBody(formBody, ContentType.APPLICATION_FORM_URLENCODED)
.build();
final Future<SimpleHttpResponse> future = client.execute(
SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(),
new HttpResponseCallback(
request, "Outgoing request failed"
));
return future.get(TIMEOUT, TimeUnit.SECONDS);
}
/***
* Perform an HTTP POST request to the CTI Console to obtain a temporary HMAC-signed URL token for the given resource.
* @param permanentToken permanent token for the instance.
* @param resource resource to request the access token to.
* @return HTTP response.
* @throws ExecutionException request failed.
* @throws InterruptedException request failed / interrupted.
* @throws TimeoutException request timed out.
*/
public SimpleHttpResponse getResourceToken(String permanentToken, String resource) throws ExecutionException, InterruptedException, TimeoutException {
String formBody = String.join("&", List.of(
"grant_type=urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token_type=urn:ietf:params:oauth:token-type:access_token",
"requested_token_type=urn:wazuh:params:oauth:token-type:signed_url",
"resource=" + resource
));
String token = String.format(Locale.ROOT, "Bearer %s", permanentToken);
SimpleHttpRequest request = SimpleRequestBuilder
.post(RESOURCE_URI)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.toString())
.addHeader(HttpHeaders.AUTHORIZATION, token)
.setBody(formBody, ContentType.APPLICATION_FORM_URLENCODED)
.build();
final Future<SimpleHttpResponse> future = client.execute(
SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(),
new HttpResponseCallback(
request, "Outgoing request failed"
));
return future.get(TIMEOUT, TimeUnit.SECONDS);
}
/**
* Perform an HTTP GET request to the CTI Console to obtain the list of plans the instance is subscribed to.
* @param permanentToken permanent token for the instance.
* @return HTTP response.
* @throws ExecutionException request failed.
* @throws InterruptedException request failed / interrupted.
* @throws TimeoutException request timed out.
*/
public SimpleHttpResponse getPlans(String permanentToken) throws ExecutionException, InterruptedException, TimeoutException {
String token = String.format(Locale.ROOT, "Bearer %s", permanentToken);
SimpleHttpRequest request = SimpleRequestBuilder
.get(PRODUCTS_URI)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())
.addHeader(HttpHeaders.AUTHORIZATION, token)
.addHeader("wazuh-tag", "v5.0.0") // TODO make dynamic
.build();
final Future<SimpleHttpResponse> future = client.execute(
SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(),
new HttpResponseCallback(
request, "Outgoing request failed"
));
return future.get(TIMEOUT, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,7 @@
package com.wazuh.contentmanager.cti.console.client;
public interface ClosableHttpClient {
void setClient(ApiClient c);
void close();
}

View File

@ -0,0 +1,42 @@
package com.wazuh.contentmanager.cti.console.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.util.List;
/**
* CTI plan DTO.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Plan {
private String name;
private String description;
private List<Product> products;
/**
* Default constructor.
*/
public Plan() {}
public String getName() {
return this.name;
}
public String getDescription() {
return this.description;
}
public List<Product> getProducts() {
return this.products;
}
@Override
public String toString() {
return "Plan{" +
"name='" + this.name + '\'' +
", description='" + this.description + '\'' +
", products=" + this.products +
'}';
}
}

View File

@ -0,0 +1,51 @@
package com.wazuh.contentmanager.cti.console.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* CTI product DTO.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Product {
private String identifier;
private String type;
private String name;
private String description;
private String resource;
/**
* Default constructor.
*/
public Product() {}
public String getIdentifier() {
return this.identifier;
}
public String getType() {
return this.type;
}
public String getName() {
return this.name;
}
public String getDescription() {
return this.description;
}
public String getResource() {
return this.resource;
}
@Override
public String toString() {
return "Product{" +
"identifier='" + this.identifier + '\'' +
", type='" + this.type + '\'' +
", name='" + this.name + '\'' +
", description='" + this.description + '\'' +
", resource='" + this.resource + '\'' +
'}';
}
}

View File

@ -0,0 +1,34 @@
package com.wazuh.contentmanager.cti.console.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* CTI token DTO.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Token {
@JsonProperty("access_token")
private String accessToken;
/**
* Default constructor.
*/
public Token() { }
/**
* Getter for accessToken.
* @return Access Token.
*/
public String getAccessToken() {
return this.accessToken;
}
@Override
public String toString() {
return "Token{" +
"accessToken='" + accessToken + '\'' +
'}';
}
}

View File

@ -0,0 +1,39 @@
package com.wazuh.contentmanager.cti.console.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wazuh.contentmanager.cti.console.client.ApiClient;
/**
* Abstract service class, for generalization.
*/
public abstract class AbstractService {
ApiClient client;
final ObjectMapper mapper;
/**
* Default constructor
*/
public AbstractService() {
this.client = new ApiClient();
this.mapper = new ObjectMapper();
}
/**
* Use for testing only.
* @param c mocked client.
*/
public void setClient(ApiClient c) {
this.close();
this.client = c;
}
/**
* Closes the underlying HTTP client. Should be called when the service is no longer needed.
*/
public void close() {
if (this.client != null) {
this.client.close();
}
}
}

View File

@ -0,0 +1,13 @@
package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.TokenListener;
import com.wazuh.contentmanager.cti.console.client.ClosableHttpClient;
import com.wazuh.contentmanager.cti.console.model.Token;
public interface AuthService extends ClosableHttpClient {
Token getToken(String clientId, String deviceCode);
Token getResourceToken(String permanentToken, String resource);
void addListener(TokenListener listener);
}

View File

@ -0,0 +1,91 @@
package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.TokenListener;
import com.wazuh.contentmanager.cti.console.model.Token;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
* Implementation of the AuthService interface.
*/
public class AuthServiceImpl extends AbstractService implements AuthService {
private static final Logger log = LogManager.getLogger(AuthServiceImpl.class);
private final List<TokenListener> listeners;
/**
* Default constructor
*/
public AuthServiceImpl() {
super();
this.listeners = new CopyOnWriteArrayList<>();
}
/**
* Obtains a permanent token for the instance from CTI Console.
* @param clientId unique client identifier for the instance.
* @param deviceCode unique device code provided by the CTI Console during the registration of the instance.
* @return access token.
*/
// TODO replace parameters with SubscriptionModel from https://github.com/wazuh/wazuh-indexer-plugins/pull/662
@Override
public Token getToken(String clientId, String deviceCode) {
try {
// Perform request
SimpleHttpResponse response = this.client.getToken(clientId, deviceCode);
if (response.getCode() == 200) {
// Parse response
Token token = this.mapper.readValue(response.getBodyText(), Token.class);
// Notify listeners
listeners.forEach(listener -> listener.onTokenChanged(token));
// Return token
return token;
} else {
log.warn("Operation to fetch a permanent token failed: { \"status_code\": {}, \"message\": {} }", response.getCode(), response.getBodyText());
}
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Couldn't obtain permanent token from CTI: {}", e.getMessage());
} catch (IOException e) {
log.error("Failed to parse permanent token: {}", e.getMessage());
}
return null;
}
/**
* Obtains a temporary HMAC-signed URL token for the given resource from CTI Console.
* @param permanentToken permanent token for the instance.
* @param resource resource to request the access token to.
* @return resource access token
*/
@Override
public Token getResourceToken(String permanentToken, String resource) {
try {
// Perform request
SimpleHttpResponse response = this.client.getResourceToken(permanentToken, resource);
if (response.getCode() == 200) {
// Parse response
return this.mapper.readValue(response.getBodyText(), Token.class);
} else {
log.warn("Operation to fetch a resource token failed: { \"status_code\": {}, \"message\": {}", response.getCode(), response.getBodyText());
}
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Couldn't obtain resource token from CTI: {}", e.getMessage());
} catch (IOException e) {
log.error("Failed to parse resource token: {}", e.getMessage());
}
return null;
}
@Override
public void addListener(TokenListener listener) {
this.listeners.add(listener);
}
}

View File

@ -0,0 +1,11 @@
package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.client.ClosableHttpClient;
import com.wazuh.contentmanager.cti.console.model.Plan;
import java.util.List;
public interface PlansService extends ClosableHttpClient {
List<Plan> getPlans(String permanentToken);
}

View File

@ -0,0 +1,53 @@
package com.wazuh.contentmanager.cti.console.service;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.wazuh.contentmanager.cti.console.model.Plan;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
* Implementation of the PlansService interface.
*/
public class PlansServiceImpl extends AbstractService implements PlansService {
private static final Logger log = LogManager.getLogger(PlansServiceImpl.class);
/**
* Default constructor.
*/
public PlansServiceImpl() {
super();
}
/**
* Obtains the list of plans the instance is subscribed to, including all associated products.
* @param permanentToken access token
* @return list of plans the instance has access to.
*/
public List<Plan> getPlans(String permanentToken) {
try {
// Perform request
SimpleHttpResponse response = this.client.getPlans(permanentToken);
if (response.getCode() == 200) {
// Parse response
JsonNode root = mapper.readTree(response.getBodyText()).get("data").get("plans");
return this.mapper.readValue(root.toString(), new TypeReference<List<Plan>>() {});
} else {
log.warn("Operation to fetch a plans failed: { \"status_code\": {}, \"message\": {}", response.getCode(), response.getBodyText());
}
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Couldn't obtain plans from CTI: {}", e.getMessage());
} catch (IOException e) {
log.error("Failed to parse plans: {}", e.getMessage());
}
return null;
}
}

View File

@ -0,0 +1,109 @@
package com.wazuh.contentmanager.cti.console;
import com.wazuh.contentmanager.cti.console.client.ApiClient;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.cti.console.service.AuthService;
import com.wazuh.contentmanager.cti.console.service.AuthServiceImpl;
import com.wazuh.contentmanager.cti.console.service.PlansService;
import com.wazuh.contentmanager.cti.console.service.PlansServiceImpl;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.opensearch.test.OpenSearchTestCase;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static org.mockito.Mockito.*;
public class CtiConsoleTests extends OpenSearchTestCase {
private CtiConsole console;
private AuthService authService;
private PlansService plansService;
@Mock
private ApiClient mockClient;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// Mock CTI Console API Client
this.mockClient = mock(ApiClient.class);
// Create service and replace its client with the mock
// Note: This creates a real ApiClient internally first, which needs to be closed
this.authService = new AuthServiceImpl();
this.plansService = new PlansServiceImpl();
this.authService.setClient(mockClient);
this.plansService.setClient(mockClient);
this.console = new CtiConsole();
this.console.setAuthService(this.authService);
this.console.setPlansService(this.plansService);
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
this.authService.close();
this.plansService.close();
}
/**
* When the auth service is successful obtaining a permanent token from the CTI Console, it must invoke
* the onTokenChange() method for all its listeners (CtiConsole). As a result, the token from the CtiConsole
* instances are updated / initialized.
* @throws ExecutionException ignored
* @throws InterruptedException ignored
* @throws TimeoutException ignored
*/
public void testOnTokenChanged() throws ExecutionException, InterruptedException, TimeoutException {
// Mock client response upon request
String response = "{\"access_token\": \"AYjcyMzY3ZDhiNmJkNTY\", \"refresh_token\": \"RjY2NjM5NzA2OWJjuE7c\", \"token_type\": \"Bearer\", \"expires_in\": 3600}";
when(this.mockClient.getToken(anyString(), anyString()))
.thenReturn(SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
Token tokenA = this.authService.getToken("anyClientID", "anyDeviceCode");
// Ensure onTokenChanged is invoked, and the token in the CtiConsole instance is updated.
Token tokenB = this.console.getToken();
assertEquals(tokenA, tokenB);
}
/**
* Tests the token retrieval mechanism with wait/notify synchronization.
* The test verifies that the calling thread properly waits for the token to be obtained
* through the polling mechanism and is notified when the token becomes available.
*
* @throws ExecutionException ignored
* @throws InterruptedException ignored
* @throws TimeoutException ignored
*/
public void testGetToken() throws ExecutionException, InterruptedException, TimeoutException {
String responsePending = "{\"error\": \"authorization_pending\"}";
String response = "{\"access_token\": \"AYjcyMzY3ZDhiNmJkNTY\", \"refresh_token\": \"RjY2NjM5NzA2OWJjuE7c\", \"token_type\": \"Bearer\", \"expires_in\": 3600}";
// Mock responses: 3 pending, success on 4th.
SimpleHttpResponse httpResponsePending = SimpleHttpResponse.create(400, responsePending.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON);
SimpleHttpResponse httpResponse = SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON);
when(this.mockClient.getToken(anyString(), anyString()))
.thenReturn(httpResponsePending, httpResponsePending, httpResponsePending, httpResponse);
// Start polling
this.console.onPostSubscriptionRequest();
// Wait for the token with a timeout
Token token = this.console.waitForToken();
// Verify the token was obtained
assertTrue(this.console.isTokenTaskCompleted());
assertNotNull(token);
assertEquals("AYjcyMzY3ZDhiNmJkNTY", token.getAccessToken());
}
}

View File

@ -0,0 +1,150 @@
package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.client.ApiClient;
import com.wazuh.contentmanager.cti.console.model.Token;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opensearch.test.OpenSearchTestCase;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static org.mockito.Mockito.*;
public class AuthServiceTests extends OpenSearchTestCase {
private AuthService authService;
@Mock private ApiClient mockClient;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// Mock CTI Console API Client
this.mockClient = mock(ApiClient.class);
// Create service and replace its client with the mock
// Note: This creates a real ApiClient internally first, which needs to be closed
this.authService = new AuthServiceImpl();
this.authService.setClient(mockClient);
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
// Close the service to properly shut down the HTTP client
if (this.authService != null) {
this.authService.close();
}
}
/**
* On success:
* - token must not be null
* - token.access_token must be a valid string (not null, not empty)
*
* @throws ExecutionException ignored
* @throws InterruptedException ignored
* @throws TimeoutException ignored
*/
public void testGetTokenSuccess() throws ExecutionException, InterruptedException, TimeoutException {
// Mock client response upon request
String response = "{\"access_token\": \"AYjcyMzY3ZDhiNmJkNTY\", \"refresh_token\": \"RjY2NjM5NzA2OWJjuE7c\", \"token_type\": \"Bearer\", \"expires_in\": 3600}";
when(this.mockClient.getToken(anyString(), anyString()))
.thenReturn(SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
Token token = this.authService.getToken("anyClientID", "anyDeviceCode");
// Token must not be null
assertNotNull(token);
// access_token must be a valid string (not null, not empty)
assertNotNull(token.getAccessToken());
assertFalse(token.getAccessToken().isEmpty());
}
/**
* Possible failures
* - CTI replies with an error
* - CTI unreachable
* in these cases, the method is expected to return null.
*
* @throws ExecutionException ignored
* @throws InterruptedException ignored
* @throws TimeoutException ignored
*/
public void testGetTokenFailure() throws ExecutionException, InterruptedException, TimeoutException {
Token token;
String response = "{\"error\": \"invalid_request\", \"error_description\": \"Missing or invalid parameter: client_id\"}";
// When CTI replies with an error code, token must be null. No exception raised
when(this.mockClient.getToken(anyString(), anyString()))
.thenReturn(SimpleHttpResponse.create(400, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
token = this.authService.getToken("anyClientID", "anyDeviceCode");
assertNull(token);
// When CTI does not reply, token must be null and exceptions are raised.
when(this.mockClient.getToken(anyString(), anyString())).thenThrow(ExecutionException.class);
token = this.authService.getToken("anyClientID", "anyDeviceCode");
assertNull(token);
}
/**
* On success:
* - token must not be null
* - token.access_token must be a valid string (not null, not empty)
*
* @throws ExecutionException ignored
* @throws InterruptedException ignored
* @throws TimeoutException ignored
*/
public void testGetResourceTokenSuccess() throws ExecutionException, InterruptedException, TimeoutException {
// Mock client response upon request
String response = "{\"access_token\": \"https://localhost:8443/api/v1/catalog/contexts/misp/consumers/virustotal/changes?from_offset=0&to_offset=1000&with_empties=true&verify=1761383411-kJ9b8w%2BQ7kzRmF\", \"issued_token_type\": \"urn:wazuh:params:oauth:token-type:signed_url\", \"expires_in\": 300}";
when(this.mockClient.getResourceToken(anyString(), anyString()))
.thenReturn(SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
Token token = this.authService.getResourceToken("anyToken", "anyResource");
// Token must not be null
assertNotNull(token);
// access_token must be a valid string (not null, not empty)
assertNotNull(token.getAccessToken());
assertFalse(token.getAccessToken().isEmpty());
}
/**
* Possible failures
* - CTI replies with an error
* - CTI unreachable
* in these cases, the method is expected to return null.
*
* @throws ExecutionException ignored
* @throws InterruptedException ignored
* @throws TimeoutException ignored
*/
public void testGetResourceTokenFailure() throws ExecutionException, InterruptedException, TimeoutException {
Token token;
String response = "{\"error\": \"invalid_target\", \"error_description\": \"The resource parameter refers to an invalid endpoint\"}";
// When CTI replies with an error code, token must be null. No exception raised
when(this.mockClient.getResourceToken(anyString(), anyString()))
.thenReturn(SimpleHttpResponse.create(400, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
token = this.authService.getResourceToken("anyToken", "anyResource");
assertNull(token);
// When CTI does not reply, token must be null and exceptions are raised.
when(this.mockClient.getResourceToken(anyString(), anyString())).thenThrow(ExecutionException.class);
token = this.authService.getResourceToken("anyToken", "anyResource");
assertNull(token);
}
}

View File

@ -0,0 +1,137 @@
package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.client.ApiClient;
import com.wazuh.contentmanager.cti.console.model.Plan;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opensearch.test.OpenSearchTestCase;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static org.mockito.Mockito.*;
public class PlansServiceTests extends OpenSearchTestCase {
private PlansService plansService;
@Mock private ApiClient mockClient;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// Mock CTI Console API Client
this.mockClient = mock(ApiClient.class);
// Create service and replace its client with the mock
// Note: This creates a real ApiClient internally first, which needs to be closed
this.plansService = new PlansServiceImpl();
this.plansService.setClient(mockClient);
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
// Close the service to properly shut down the HTTP client
if (this.plansService != null) {
this.plansService.close();
}
}
/**
* On success:
* - plans must not be null
* - plans must not be empty
* - a plan must contain products
*
* @throws ExecutionException ignored
* @throws InterruptedException ignored
* @throws TimeoutException ignored
*/
public void testGetPlansSuccess() throws ExecutionException, InterruptedException, TimeoutException {
// Mock client response upon request
String response = """
{
"data": {
"organization": {
"avatar": "https://acme.sl/avatar.png",
"name": "ACME S.L."
},
"plans": [
{
"name": "Wazuh Cloud",
"description": "Managed instances in AWS by Wazuh's professional staf that…",
"products": [
{
"identifier": "assistance-24h",
"type": "cloud:assistance:wazuh",
"name": "Technical assistance 24h",
"email": "cloud@wazuh.com",
"phone": "+34 123 456 789"
},
{
"identifier": "vulnerabilities-pro",
"type": "catalog:consumer:vulnerabilities",
"name": "Vulnerabilities Pro",
"description": "Vulnerabilities updated as soon as they are added to the catalog",
"resource": "https://localhost:8080/api/v1/catalog/plans/pro/contexts/vulnerabilities/consumer/realtime"
},
{
"identifier": "bad-guy-ips-pro",
"type": "catalog:consumer:iocs",
"name": "Bad Guy IPs",
"description": "Dolor sit amet…",
"resource": "https://localhost:8080/api/v1/catalog/plans/pro/contexts/bad-guy-ips/consumer/realtime"
}
]
}
]
}
}""";
when(this.mockClient.getPlans(anyString()))
.thenReturn(SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
List<Plan> plans = this.plansService.getPlans("anyToken");
// plans must not be null, or empty
assertNotNull(plans);
assertFalse(plans.isEmpty());
// plan must contain products
assertFalse(plans.getFirst().getProducts().isEmpty());
}
/**
* Possible failures
* - CTI replies with an error
* - CTI unreachable
* in these cases, the method is expected to return null.
*
* @throws ExecutionException ignored
* @throws InterruptedException ignored
* @throws TimeoutException ignored
*/
public void testGetPlansFailure() throws ExecutionException, InterruptedException, TimeoutException {
List<Plan> plans;
String response = "{\"error\": \"unauthorized_client\", \"error_description\": \"The provided token is invalid or expired\"}";
// When CTI replies with an error code, token must be null. No exception raised
when(this.mockClient.getPlans(anyString()))
.thenReturn(SimpleHttpResponse.create(400, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
plans = this.plansService.getPlans("anyToken");
assertNull(plans);
// When CTI does not reply, token must be null and exceptions are raised.
when(this.mockClient.getPlans(anyString())).thenThrow(ExecutionException.class);
plans = this.plansService.getPlans("anyToken");
assertNull(plans);
}
}