Init content from snapshot (#670)

* Apply new folder structure

* Save state

* Add creation of content indices

* Add CTI rules mappings

* Create decoders, kvdbs and decoders integrations indices

* Add SnapshotService to allow content initialization from a snapshot (#673)

* Add SnapshotService class

* Fix forbidden-apis error

* Modify mappings to include all payload fields

---------

Co-authored-by: Alex Ruiz <alejandro.ruiz.becerra@wazuh.com>

* Add snapshot indexing

* Fix decoders mappings

* Add YAML string field to decoder index (#676)

* Add YAML string field to decoders

* Change YAML String order

* Add order to YAML representation of decoders

* Extract decoders' order list out of the loop

---------

Co-authored-by: Alex Ruiz <alejandro.ruiz.becerra@wazuh.com>

* Simplify initialization of consumers

* Disable indexing of fields for internal indices (documents are stored still)

* Improve decoders order in YAML representation

* Small clean-up

* Code cleanup and add tests to 3568 branch (#677)

* Add missing javadocs

Signed-off-by: Jorge Sanchez <jorge.sanchez@wazuh.com>

* Create CatalogSyncJob class

* Revert "Disable indexing of fields for internal indices (documents are stored still)"

This reverts commit 0ba65a5781cc41b8986fdcefe262f66e4b9eb992.

* Fix tests imports

* Add SnapshotServiceImpl unit tests

* Disable flawky tests

* Changes from code review

---------

Signed-off-by: Jorge Sanchez <jorge.sanchez@wazuh.com>

* Add changelog entry and upgrades

* Add aliases to CTI indices and improve logging

---------

Signed-off-by: Jorge Sanchez <jorge.sanchez@wazuh.com>
Co-authored-by: Jorge Sánchez <jorge.sanchez@wazuh.com>
This commit is contained in:
Álex Ruiz Becerra 2025-12-02 13:32:58 +01:00 committed by GitHub
parent 3e58e4188a
commit 3415046065
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
50 changed files with 2612 additions and 894 deletions

View File

@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement authentication in CTI Console [#666](https://github.com/wazuh/wazuh-indexer-plugins/pull/666)
- Initialize consumers metadata index on start [(#668)](https://github.com/wazuh/wazuh-indexer-plugins/pull/668)
- Add job scheduler basic logic [(#671)](https://github.com/wazuh/wazuh-indexer-plugins/pull/671)
- Init content from snapshot [(#670)](https://github.com/wazuh/wazuh-indexer-plugins/pull/670)
### Dependencies
- Upgrade to Gradle 8.14.3 [(#649)](https://github.com/wazuh/wazuh-indexer-plugins/pull/649)

View File

@ -94,6 +94,7 @@ dependencies {
compileOnly 'com.fasterxml.jackson.core:jackson-core:2.18.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.2'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.18.2'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.2'
// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: "${opensearch_build}"

View File

@ -16,34 +16,26 @@
*/
package com.wazuh.contentmanager;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.jobscheduler.ContentJobParameter;
import com.wazuh.contentmanager.jobscheduler.ContentJobRunner;
import com.wazuh.contentmanager.jobscheduler.jobs.HelloWorldJob;
import com.wazuh.contentmanager.jobscheduler.jobs.CatalogSyncJob;
import com.wazuh.contentmanager.rest.services.RestDeleteSubscriptionAction;
import com.wazuh.contentmanager.rest.services.RestGetSubscriptionAction;
import com.wazuh.contentmanager.rest.services.RestPostSubscriptionAction;
import com.wazuh.contentmanager.rest.services.RestPostUpdateAction;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import com.wazuh.contentmanager.utils.SnapshotManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.*;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
@ -63,24 +55,24 @@ import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.watcher.ResourceWatcherService;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.function.Supplier;
/**
* Main class of the Content Manager Plugin
*/
public class ContentManagerPlugin extends Plugin implements ClusterPlugin, JobSchedulerExtension, ActionPlugin {
private static final Logger log = LogManager.getLogger(ContentManagerPlugin.class);
private static final String JOB_INDEX_NAME = ".wazuh-content-manager-jobs";
private static final String JOB_ID = "wazuh-catalog-sync-job";
/**
* Semaphore to ensure the context index creation is only triggered once.
*/
private static final Semaphore indexCreationSemaphore = new Semaphore(1);
private ConsumersIndex consumersIndex;
private ContentIndex contentIndex;
private SnapshotManager snapshotManager;
private ThreadPool threadPool;
private ClusterService clusterService;
private CtiConsole ctiConsole;
private Client client;
private Environment environment;
// Rest API endpoints
public static final String PLUGINS_BASE_URI = "/_plugins/content-manager";
@ -103,15 +95,15 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin, JobSc
PluginSettings.getInstance(environment.settings(), clusterService);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.environment = environment;
this.consumersIndex = new ConsumersIndex(client);
this.contentIndex = new ContentIndex(client);
this.snapshotManager =
new SnapshotManager(environment, this.consumersIndex, this.contentIndex, new Privileged());
ContentJobRunner runner = ContentJobRunner.getInstance();
runner.registerExecutor(HelloWorldJob.JOB_TYPE, new HelloWorldJob());
// Content Manager 5.0
this.ctiConsole = new CtiConsole();
ContentJobRunner runner = ContentJobRunner.getInstance();
// Register Executors
runner.registerExecutor(CatalogSyncJob.JOB_TYPE, new CatalogSyncJob(client, consumersIndex, environment, threadPool));
return Collections.emptyList();
}
@ -128,41 +120,9 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin, JobSc
if (localNode.isClusterManagerNode()) {
this.start();
}
this.scheduleHelloWorldJob();
/*
// Use case 1. Polling
AuthServiceImpl authService = new AuthServiceImpl();
this.ctiConsole = new CtiConsole();
this.ctiConsole.setAuthService(authService);
this.ctiConsole.onPostSubscriptionRequest();
while (!this.ctiConsole.isTokenTaskCompleted()) {}
if (this.ctiConsole.isTokenTaskCompleted()) {
Token token = this.ctiConsole.getToken();
// Use case 2. Obtain available plans
PlansServiceImpl productsService = new PlansServiceImpl();
List<Plan> plans = productsService.getPlans(token.getAccessToken());
log.info("Plans: {}", plans);
// Use case 3. Obtain resource token.
Product vulnsPro = plans.stream()
.filter(plan -> plan.getName().equals("Pro Plan Deluxe"))
.toList()
.getFirst()
.getProducts().stream()
.filter(product -> product.getIdentifier().equals("vulnerabilities-pro"))
.toList()
.getFirst();
Token resourceToken = authService.getResourceToken(
token.getAccessToken(),
vulnsPro.getResource()
);
log.info("Resource token {}", resourceToken);
}
*/
// Schedule the periodic sync job via OpenSearch Job Scheduler
this.scheduleCatalogSyncJob();
}
public List<RestHandler> getRestHandlers(
@ -185,8 +145,8 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin, JobSc
* Initialize. The initialization consists of:
*
* <pre>
* 1. fetching the latest consumer's information from the CTI API.
* 2. initialize from a snapshot if the local consumer does not exist, or its offset is 0.
* 1. create required indices if they do not exist.
* 2. initialize from a snapshot if the local consumer does not exist, or its offset is 0.
* </pre>
*/
private void start() {
@ -195,18 +155,15 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin, JobSc
.generic()
.execute(
() -> {
if (indexCreationSemaphore.tryAcquire()) {
try {
this.consumersIndex.createIndex();
} catch (Exception e) {
indexCreationSemaphore.release();
log.error("Failed to create {} index, due to: {}", ConsumersIndex.INDEX_NAME, e.getMessage(), e);
try {
CreateIndexResponse response = this.consumersIndex.createIndex();
if (response.isAcknowledged()) {
log.info("Index created: {} acknowledged={}", response.index(), response.isAcknowledged());
}
} else {
log.debug("{} index creation already triggered", ConsumersIndex.INDEX_NAME);
} catch (Exception e) {
log.error("Failed to create {} index, due to: {}", ConsumersIndex.INDEX_NAME, e.getMessage(), e);
}
// TODO: Once initialize method is adapted to the new design, uncomment the following line
//this.snapshotManager.initialize();
});
} catch (Exception e) {
// Log or handle exception
@ -214,31 +171,53 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin, JobSc
}
}
// TODO: Change to actual job implementation, this is just an example
private void scheduleHelloWorldJob() {
String jobId = "wazuh-hello-world-job";
/**
* Schedules the Catalog Sync Job.
*/
private void scheduleCatalogSyncJob() {
this.threadPool.generic().execute(() -> {
try {
boolean exists = this.client.admin().indices().prepareExists(JOB_INDEX_NAME).get().isExists() &&
this.client.prepareGet(JOB_INDEX_NAME, jobId).get().isExists();
if (!exists) {
log.info("Scheduling Hello World Job to run every 1 minute...");
// 1. Check if the index exists; if not, create it with specific settings.
boolean indexExists = this.client.admin().indices().prepareExists(JOB_INDEX_NAME).get().isExists();
if (!indexExists) {
try {
Settings settings = Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.hidden", true)
.build();
this.client.admin().indices().prepareCreate(JOB_INDEX_NAME)
.setSettings(settings)
.get();
log.info("Created job index {}.", JOB_INDEX_NAME);
} catch (Exception e) {
log.warn("Could not create index {}: {}", JOB_INDEX_NAME, e.getMessage());
}
}
// 2. Check if the job document exists; if not, index it.
boolean jobExists = this.client.prepareGet(JOB_INDEX_NAME, JOB_ID).get().isExists();
if (!jobExists) {
ContentJobParameter job = new ContentJobParameter(
"Hello World Periodic Task",
HelloWorldJob.JOB_TYPE,
"Catalog Sync Periodic Task",
CatalogSyncJob.JOB_TYPE,
new IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
true,
Instant.now(),
Instant.now()
);
IndexRequest request = new IndexRequest(JOB_INDEX_NAME)
.id(jobId)
.id(JOB_ID)
.source(job.toXContent(XContentFactory.jsonBuilder(), null));
this.client.index(request).actionGet();
log.info("Hello World Job scheduled successfully.");
log.info("Catalog Sync Job scheduled successfully.");
}
} catch (Exception e) {
log.error("Error scheduling Hello World Job: {}", e.getMessage());
log.error("Error scheduling Catalog Sync Job: {}", e.getMessage());
}
});
}

View File

@ -46,7 +46,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.http.HttpResponseCallback;
import reactor.util.annotation.NonNull;
/**

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.utils.http;
package com.wazuh.contentmanager.client;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;

View File

@ -0,0 +1,20 @@
package com.wazuh.contentmanager.cti.catalog;
import com.wazuh.contentmanager.cti.catalog.service.ConsumerService;
/**
* Represents the CTI Catalog.
* Acts as a facade or entry point for catalog-related operations, primarily managing consumers.
*/
public class CtiCatalog {
private ConsumerService consumerService;
/**
* Constructs a new CtiCatalog instance.
*
* @param consumerService The service used to manage local and remote consumers.
*/
public CtiCatalog(ConsumerService consumerService) {
this.consumerService = consumerService;
}
}

View File

@ -0,0 +1,118 @@
package com.wazuh.contentmanager.cti.catalog.client;
import com.wazuh.contentmanager.client.HttpResponseCallback;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.util.Timeout;
import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Client for interacting with the Wazuh CTI Catalog API.
* <p>
* This client manages an asynchronous HTTP client to perform requests against
* the catalog service, specifically handling consumer context retrieval.
*/
public class ApiClient {
private static final String BASE_URI = "https://cti-pre.wazuh.com";
private static final String API_PREFIX = "/api/v1";
private CloseableHttpAsyncClient client;
/**
* Constructs an ApiClient instance and initializes the underlying HTTP client.
*/
public ApiClient() {
this.buildClient();
}
/**
* Builds and starts the asynchronous HTTP client.
*
* @throws RuntimeException if the SSL context cannot be initialized.
*/
private void buildClient() {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(Timeout.ofSeconds(PluginSettings.getInstance().getClientTimeout()))
.build();
SSLContext sslContext;
try {
sslContext =
SSLContextBuilder.create()
.loadTrustMaterial(null, (chains, authType) -> true)
.build();
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
throw new RuntimeException("Failed to initialize HttpClient", e);
}
this.client = HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.setConnectionManager(
PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(
ClientTlsStrategyBuilder.create().setSslContext(sslContext).build())
.build())
.build();
this.client.start();
}
/**
* Closes the underlying HTTP asynchronous client gracefully.
*/
public void close() {
this.client.close(CloseMode.GRACEFUL);
}
/**
* Constructs the full URI for a specific consumer within a given context.
*
* @param context The context identifier (e.g., the specific catalog section).
* @param consumer The consumer identifier.
* @return A string representing the full absolute URL for the resource.
*/
private String buildConsumerURI(String context, String consumer) {
return BASE_URI + API_PREFIX + "/catalog/contexts/" + context + "/consumers/" + consumer;
}
/**
* Retrieves consumer details from the CTI Catalog.
*
* @param context The context associated with the consumer.
* @param consumer The name or ID of the consumer to retrieve.
* @return A {@link SimpleHttpResponse} containing the API response.
* @throws ExecutionException If the computation threw an exception.
* @throws InterruptedException If the current thread was interrupted while waiting.
* @throws TimeoutException If the wait timed out.
*/
public SimpleHttpResponse getConsumer(String context, String consumer) throws ExecutionException, InterruptedException, TimeoutException {
SimpleHttpRequest request = SimpleRequestBuilder
.get(this.buildConsumerURI(context, consumer))
.build();
final Future<SimpleHttpResponse> future = client.execute(
SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(),
new HttpResponseCallback(
request, "Outgoing request failed"
)
);
return future.get(PluginSettings.getInstance().getClientTimeout(), TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,77 @@
package com.wazuh.contentmanager.cti.catalog.client;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.env.Environment;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
/**
* Client responsible for downloading CTI snapshots from a remote source.
*/
public class SnapshotClient {
private static final Logger log = LogManager.getLogger(SnapshotClient.class);
private final Environment env;
/**
* Default constructor.
* @param env node's environment
*/
public SnapshotClient(Environment env) {
this.env = env;
}
/***
* Downloads the CTI snapshot.
*
* @param snapshotURI URI to the file to download.
* @return The downloaded file's name
*/
public Path downloadFile(String snapshotURI) throws IOException, URISyntaxException {
try (CloseableHttpClient client = HttpClients.createDefault()) {
// Setup
final URI uri = new URI(snapshotURI);
final HttpGet request = new HttpGet(uri);
final String filename = uri.getPath().substring(uri.getPath().lastIndexOf('/') + 1);
final Path path = this.env.tmpDir().resolve(filename);
// Download
log.info("Starting snapshot download from [{}]", uri);
try (CloseableHttpResponse response = client.execute(request)) {
if (response.getEntity() != null) {
// Write to disk
InputStream input = response.getEntity().getContent();
try (OutputStream out =
new BufferedOutputStream(
Files.newOutputStream(
path,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING))) {
int bytesRead;
byte[] buffer = new byte[1024];
while ((bytesRead = input.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
}
}
}
}
log.info("Snapshot downloaded to [{}]", path);
return path;
}
}
}

View File

@ -14,8 +14,9 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.index;
package com.wazuh.contentmanager.cti.catalog.index;
import com.wazuh.contentmanager.cti.catalog.model.LocalConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteResponse;
@ -25,9 +26,9 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.transport.client.Client;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.ToXContent;
@ -46,6 +47,7 @@ import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.ClusterInfo;
/** Class to manage the Context index. */
// TODO remove unused methods: all but setConsumer(), getConsumer() and createIndex()
public class ConsumersIndex {
private static final Logger log = LogManager.getLogger(ConsumersIndex.class);
@ -110,6 +112,57 @@ public class ConsumersIndex {
return false;
}
/**
* Indexes a local consumer object into the cluster.
*
* @param consumer The {@link LocalConsumer} object containing the data to be indexed.
* @return The {@link IndexResponse} indicating the result of the operation.
* @throws ExecutionException If the client failed to execute the request.
* @throws InterruptedException If the current thread was interrupted while waiting for the response.
* @throws TimeoutException If the operation exceeded the configured client timeout.
* @throws IOException If there is an error serializing the consumer to XContent.
* @throws RuntimeException If the target index is not currently ready or available.
*/
public IndexResponse setConsumer(LocalConsumer consumer) throws ExecutionException, InterruptedException, TimeoutException, IOException {
// Avoid faulty requests if the cluster is unstable.
if (!ClusterInfo.indexStatusCheck(this.client, INDEX_NAME)) {
throw new RuntimeException("Index not ready");
}
// Composed ID
String id = String.format(Locale.ROOT, "%s_%s", consumer.getContext(), consumer.getName());
IndexRequest request = new IndexRequest()
.index(INDEX_NAME)
.id(id)
.source(consumer.toXContent());
return this.client.index(request).get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
}
/**
* Retrieves a consumer document from the index by its composite identifier.
*
* @param context The context identifier of the consumer.
* @param consumer The name identifier of the consumer.
* @return A {@link GetResponse} containing the document source and metadata.
* @throws ExecutionException If the client failed to execute the request.
* @throws InterruptedException If the current thread was interrupted while waiting for the response.
* @throws TimeoutException If the operation exceeded the configured client timeout.
* @throws RuntimeException If the target index is not currently ready or available.
*/
public GetResponse getConsumer(String context, String consumer) throws ExecutionException, InterruptedException, TimeoutException {
// Avoid faulty requests if the cluster is unstable.
if (!ClusterInfo.indexStatusCheck(this.client, INDEX_NAME)) {
throw new RuntimeException("Index not ready");
}
// Composed ID
String id = String.format(Locale.ROOT, "%s_%s", context, consumer);
GetRequest request = new GetRequest().index(INDEX_NAME).id(id).preference("_local");
ActionFuture<GetResponse> future = this.client.get(request);
return future.get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
}
/** TODO: Review ConsumerInfo class and adapt mappings accordingly */
/**
* Searches for the given consumer within a context.
@ -172,28 +225,36 @@ public class ConsumersIndex {
return ClusterInfo.indexExists(this.client, ConsumersIndex.INDEX_NAME);
}
/** Creates the {@link ConsumersIndex#INDEX_NAME} index. */
public void createIndex() {
try {
String mappingJson = this.loadMappingFromResources();
/**
* Creates the {@link ConsumersIndex#INDEX_NAME} index.
*
* @return
*/
public CreateIndexResponse createIndex() throws ExecutionException, InterruptedException, TimeoutException {
Settings settings = Settings.builder()
.put("index.number_of_replicas", 0)
.put("hidden", true)
.build();
CreateIndexRequest request = new CreateIndexRequest(ConsumersIndex.INDEX_NAME);
String mappings;
try {
mappings = this.loadMappingFromResources();
} catch (IOException e) {
log.error("Could not read mappings for index [{}]", INDEX_NAME);
return null;
}
// Index settings
request.settings(Settings.builder().put("index.number_of_replicas", 0).build());
request.mapping(mappingJson, XContentType.JSON);
CreateIndexRequest request = new CreateIndexRequest()
.index(INDEX_NAME)
.mapping(mappings)
.settings(settings);
CreateIndexResponse response = this.client
return this.client
.admin()
.indices()
.create(request)
.actionGet(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
log.info("Index created: {} acknowledged={}", response.index(), response.isAcknowledged());
} catch (Exception e) {
log.warn("Index creation attempt failed: {}", e.getMessage());
}
}
.get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
}
/**
* Loads the index mapping from the resources folder.
@ -202,7 +263,6 @@ public class ConsumersIndex {
* @throws IOException if reading the resource fails.
*/
private String loadMappingFromResources() throws IOException {
try (InputStream is = this.getClass().getResourceAsStream(MAPPING_PATH)) {
return new String(is.readAllBytes(), StandardCharsets.UTF_8);
}

View File

@ -0,0 +1,432 @@
/*
* Copyright (C) 2024, Wazuh Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.cti.catalog.index;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.transport.client.Client;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.mapper.StrictDynamicMappingException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.Offset;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.cti.catalog.utils.JsonPatch;
import com.wazuh.contentmanager.utils.XContentUtils;
/**
* Manages operations for the Wazuh CTI Content Index.
*/
public class ContentIndex {
private static final String JSON_NAME_KEY = "name";
private static final String JSON_OFFSET_KEY = "offset";
private static final Logger log = LogManager.getLogger(ContentIndex.class);
//TODO: Delete
public static final String INDEX_NAME = "wazuh-ruleset";
private final Client client;
private final PluginSettings pluginSettings;
private final Semaphore semaphore;
private String indexName;
private String mappingsPath;
private String alias;
/**
* Constructs a ContentIndex manager with specific settings.
*
* @param client The OpenSearch client.
* @param indexName The name of the index to manage.
* @param mappingsPath The classpath resource path to the index mappings file.
*/
public ContentIndex(Client client, String indexName, String mappingsPath) {
this.pluginSettings = PluginSettings.getInstance();
this.semaphore = new Semaphore(pluginSettings.getMaximumConcurrentBulks());
this.client = client;
this.indexName = indexName;
this.mappingsPath = mappingsPath;
}
/**
* Constructs a ContentIndex manager with specific settings and an alias.
*
* @param client The OpenSearch client.
* @param indexName The name of the index to manage.
* @param mappingsPath The classpath resource path to the index mappings file.
* @param alias The alias to assign to the index.
*/
public ContentIndex(Client client, String indexName, String mappingsPath, String alias) {
this(client, indexName, mappingsPath);
this.alias = alias;
}
/**
* Creates the content index with specific settings and mappings.
*
* @return A {@link CreateIndexResponse} indicating success, or {@code null} if mappings could not be read.
* @throws ExecutionException If the creation request fails.
* @throws InterruptedException If the thread is interrupted while waiting for the response.
* @throws TimeoutException If the operation exceeds the configured client timeout.
*/
public CreateIndexResponse createIndex() throws ExecutionException, InterruptedException, TimeoutException {
Settings settings = Settings.builder()
.put("index.number_of_replicas", 0)
.put("hidden", true)
.build();
String mappings;
try (InputStream is = this.getClass().getResourceAsStream(this.mappingsPath)) {
mappings = new String(is.readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
log.error("Could not read mappings for index [{}]", this.indexName);
return null;
}
CreateIndexRequest request = new CreateIndexRequest()
.index(this.indexName)
.mapping(mappings)
.settings(settings);
if (this.alias != null && !this.alias.isEmpty()) {
request.alias(new Alias(this.alias));
}
return this.client
.admin()
.indices()
.create(request)
.get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
}
/**
* Executes a bulk request using the semaphore.
*
* @param bulkRequest The request to execute.
*/
public void executeBulk(BulkRequest bulkRequest) {
try {
this.semaphore.acquire();
this.client.bulk(bulkRequest, new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
semaphore.release();
if (bulkResponse.hasFailures()) {
log.warn("Bulk indexing finished with failures: {}", bulkResponse.buildFailureMessage());
} else {
log.debug("Bulk indexing successful. Indexed {} documents.", bulkResponse.getItems().length);
}
}
@Override
public void onFailure(Exception e) {
semaphore.release();
log.error("Bulk indexing failed completely: {}", e.getMessage());
}
});
} catch (InterruptedException e) {
log.error("Interrupted while waiting for semaphore: {}", e.getMessage());
Thread.currentThread().interrupt();
}
}
/**
* Constructs a ContentIndex manager using default plugin settings.
*
* @param client the OpenSearch Client to interact with the cluster
*/
public ContentIndex(Client client) {
this.pluginSettings = PluginSettings.getInstance();
this.semaphore = new Semaphore(pluginSettings.getMaximumConcurrentBulks());
this.client = client;
}
/**
* Constructs a ContentIndex manager with injected settings (testing).
*
* @param client Client.
* @param pluginSettings PluginSettings.
*/
public ContentIndex(Client client, PluginSettings pluginSettings) {
this.pluginSettings = pluginSettings;
this.semaphore = new Semaphore(pluginSettings.getMaximumConcurrentBulks());
this.client = client;
}
/**
* Searches for an element in the {@link ContentIndex#INDEX_NAME} by its ID.
*
* @param resourceId the ID of the element to retrieve.
* @return the element as a JsonObject instance.
* @throws InterruptedException if the operation is interrupted.
* @throws ExecutionException if an error occurs during execution.
* @throws TimeoutException if the operation times out.
* @throws IllegalArgumentException if the content is not found in the index.
*/
public JsonObject getById(String resourceId)
throws InterruptedException, ExecutionException, TimeoutException, IllegalArgumentException {
GetResponse response =
this.client
.get(new GetRequest(ContentIndex.INDEX_NAME, resourceId))
.get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
if (response.isExists()) {
return JsonParser.parseString(response.getSourceAsString()).getAsJsonObject();
}
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Document with ID [%s] not found in the [%s] index",
resourceId,
ContentIndex.INDEX_NAME));
}
/**
* Indexes a single Offset document synchronously.
*
* @param document {@link Offset} document to index.
* @throws StrictDynamicMappingException if the document does not match the index mappings.
* @throws ExecutionException if the index operation failed to execute.
* @throws InterruptedException if the index operation was interrupted.
* @throws TimeoutException if the index operation timed out.
* @throws IOException if XContentBuilder creation fails.
*/
public void index(Offset document)
throws StrictDynamicMappingException,
ExecutionException,
InterruptedException,
TimeoutException,
IOException {
IndexRequest indexRequest =
new IndexRequest()
.index(ContentIndex.INDEX_NAME)
.source(document.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(document.getResource());
this.client.index(indexRequest).get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
}
/**
* Indexes a list of JSON documents in bulk asynchronously.
*
* @param documents list of JSON documents to be indexed.
*/
public void index(List<JsonObject> documents) {
BulkRequest bulkRequest = new BulkRequest(ContentIndex.INDEX_NAME);
for (JsonObject document : documents) {
bulkRequest.add(
new IndexRequest()
.id(document.get(ContentIndex.JSON_NAME_KEY).getAsString())
.source(document.toString(), XContentType.JSON));
}
this.client.bulk(
bulkRequest,
new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
semaphore.release();
if (bulkResponse.hasFailures()) {
log.error("Bulk index operation failed: {}", bulkResponse.buildFailureMessage());
} else {
log.debug("Bulk index operation succeeded in {} ms", bulkResponse.getTook().millis());
}
}
@Override
public void onFailure(Exception e) {
semaphore.release();
log.error("Bulk index operation failed: {}", e.getMessage(), e);
}
});
}
/**
* Deletes a document from the index asynchronously.
*
* @param id ID of the document to delete.
*/
public void delete(String id) {
this.client.delete(
new DeleteRequest(ContentIndex.INDEX_NAME, id),
new ActionListener<>() {
@Override
public void onResponse(DeleteResponse response) {
log.info("Deleted CTI Catalog Content {} from index", id);
}
@Override
public void onFailure(Exception e) {
log.error("Failed to delete CTI Catalog Content {}: {}", id, e.getMessage(), e);
}
});
}
/**
* Initializes the index from a local snapshot file.
*
* @param path path to the CTI snapshot JSON file to be indexed.
* @return The offset number of the last indexed resource of the snapshot, or 0 on error/empty.
*/
public long fromSnapshot(String path) {
long startTime = System.currentTimeMillis();
String line;
JsonObject json;
int lineCount = 0;
ArrayList<JsonObject> items = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(path, StandardCharsets.UTF_8))) {
while ((line = reader.readLine()) != null) {
json = JsonParser.parseString(line).getAsJsonObject();
items.add(json);
lineCount++;
// Index items (MAX_DOCUMENTS reached)
if (lineCount == this.pluginSettings.getMaxItemsPerBulk()) {
this.semaphore.acquire();
this.index(items);
lineCount = 0;
items.clear();
}
}
// Index remaining items (> MAX_DOCUMENTS)
if (lineCount > 0) {
this.semaphore.acquire();
this.index(items);
}
} catch (InterruptedException e) {
items.clear();
log.error("Processing snapshot file interrupted {}", e.getMessage());
} catch (Exception e) {
items.clear();
log.error("Generic exception indexing the snapshot: {}", e.getMessage());
}
long estimatedTime = System.currentTimeMillis() - startTime;
log.info("Snapshot indexing finished successfully in {} ms", estimatedTime);
return items.isEmpty()
? 0
: items.get(items.size() - 1).get(ContentIndex.JSON_OFFSET_KEY).getAsLong();
}
/**
* Applies a set of changes (create, update, delete) to the content index.
*
* @param changes content changes to apply.
* @throws RuntimeException if the patching process is interrupted or fails.
* @deprecated Use of this specific patch implementation may be replaced by newer synchronization methods.
*/
public void patch(Changes changes) {
ArrayList<Offset> offsets = changes.get();
if (offsets.isEmpty()) {
log.info("No changes to apply");
return;
}
log.info(
"Patching [{}] from offset [{}] to [{}]",
ContentIndex.INDEX_NAME,
changes.getFirst().getOffset(),
changes.getLast().getOffset());
for (Offset change : offsets) {
String id = change.getResource();
try {
log.debug("Processing offset [{}]", change.getOffset());
switch (change.getType()) {
case CREATE:
log.debug("Creating new resource with ID [{}]", id);
this.index(change);
break;
case UPDATE:
log.debug("Updating resource with ID [{}]", id);
JsonObject content = this.getById(id);
for (Operation op : change.getOperations()) {
JsonPatch.applyOperation(content, XContentUtils.xContentObjectToJson(op));
}
try (XContentParser parser = XContentUtils.createJSONParser(content)) {
this.index(Offset.parse(parser));
}
break;
case DELETE:
log.debug("Deleting resource with ID [{}]", id);
this.delete(id);
break;
default:
throw new IllegalArgumentException("Unknown change type: " + change.getType());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while patching", e);
} catch (Exception e) {
log.error("Failed to patch [{}] due to {}", id, e.getMessage());
throw new RuntimeException("Patch operation failed", e);
}
}
}
/**
* Clears all documents from the {@link ContentIndex#INDEX_NAME} index using a "delete by query" operation.
*/
public void clear() {
try {
DeleteByQueryRequestBuilder deleteByQuery =
new DeleteByQueryRequestBuilder(this.client, DeleteByQueryAction.INSTANCE);
deleteByQuery.source(this.indexName).filter(QueryBuilders.matchAllQuery());
BulkByScrollResponse response = deleteByQuery.get();
log.debug(
"[{}] wiped. {} documents were removed", this.indexName, response.getDeleted());
} catch (OpenSearchTimeoutException e) {
log.error("[{}] delete query timed out: {}", this.indexName, e.getMessage());
}
}
}

View File

@ -0,0 +1,33 @@
package com.wazuh.contentmanager.cti.catalog.model;
/**
* Base class for Consumer models.
* Contains shared properties between local and remote consumers.
*/
public abstract class AbstractConsumer {
String name;
String context;
/**
* Default constructor
*/
public AbstractConsumer() {}
/**
* Gets the context identifier.
*
* @return The context string.
*/
public String getContext() {
return context;
}
/**
* Gets the consumer name.
*
* @return The name string.
*/
public String getName() {
return name;
}
}

View File

@ -0,0 +1,134 @@
package com.wazuh.contentmanager.cti.catalog.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import java.io.IOException;
/**
* Data Transfer Object representing the local state of a CTI Catalog Consumer.
* This class tracks the synchronization status of a specific content context. It maintains
* the {@code localOffset} versus the {@code remoteOffset}, along with a link to the snapshot
* for bulk updates.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class LocalConsumer extends AbstractConsumer implements ToXContent {
@JsonProperty("local_offset")
private long localOffset;
@JsonProperty("remote_offset")
private long remoteOffset;
@JsonProperty("snapshot_link")
private String snapshotLink;
/**
* Default constructor.
*/
public LocalConsumer() {
super();
}
/**
* Constructs a new LocalConsumer with a basic identity.
*
* @param context The context identifier (e.g., "rules_development").
* @param name The consumer name.
*/
public LocalConsumer (String context, String name) {
this.context = context;
this.name = name;
this.localOffset = 0;
this.remoteOffset = 0;
this.snapshotLink = "";
}
/**
* Constructs a LocalConsumer with full state details.
*
* @param context The context identifier.
* @param name The consumer name.
* @param localOffset The current offset processed locally.
* @param remoteOffset The last known offset available remotely.
* @param snapshotUrl The URL of the snapshot associated with this state.
*/
public LocalConsumer(String context, String name, long localOffset, long remoteOffset, String snapshotUrl) {
this.context = context;
this.name = name;
this.localOffset = localOffset;
this.remoteOffset = remoteOffset;
this.snapshotLink = snapshotUrl;
}
/**
* Gets the local synchronization offset.
*
* @return The sequence number of the last processed item.
*/
public long getLocalOffset() {
return localOffset;
}
/**
* Gets the remote synchronization offset.
*
* @return The sequence number of the latest item available upstream.
*/
public long getRemoteOffset() {
return remoteOffset;
}
/**
* Gets the snapshot download URL.
*
* @return A string containing the URL, or empty if not set.
*/
public String getSnapshotLink() {
return snapshotLink;
}
@Override
public String toString() {
return "LocalConsumer{" +
"localOffset=" + localOffset +
", remoteOffset=" + remoteOffset +
", snapshotLink='" + snapshotLink + '\'' +
", context='" + context + '\'' +
", name='" + name + '\'' +
'}';
}
/**
* Serializes the consumer object to a new XContentBuilder (JSON).
*
* @return The XContentBuilder containing the JSON representation.
* @throws IOException If an I/O error occurs during building.
*/
public XContentBuilder toXContent() throws IOException {
return this.toXContent(XContentFactory.jsonBuilder(), null);
}
/**
* Serializes the consumer properties into an XContentBuilder.
*
* @param builder The builder to write to.
* @param params Parameters for the XContent generation (unused here).
* @return The builder with the consumer fields appended.
* @throws IOException If an I/O error occurs during building.
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field("name", this.name)
.field("context", this.context)
.field("local_offset", this.localOffset)
.field("remote_offset", this.remoteOffset)
.field("snapshot_link", this.snapshotLink)
.endObject();
return builder;
}
}

View File

@ -0,0 +1,57 @@
package com.wazuh.contentmanager.cti.catalog.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
/**
* CTI Consumer DTO.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class RemoteConsumer extends AbstractConsumer {
private final long offset;
private final String snapshotLink;
/**
* Default constructor
*/
public RemoteConsumer(@JsonProperty("data") JsonNode data) {
this.name = data.get("name").asText("");
this.context = data.get("context").asText("");
this.offset = data.get("last_offset").asLong(0);
this.snapshotLink = data.get("last_snapshot_link").asText("");
}
/**
* Gets the last known offset of the remote consumer.
*
* @return The offset value.
*/
public long getOffset() {
return offset;
}
/**
* Gets the link to the latest snapshot.
*
* @return The snapshot URL string.
*/
public String getSnapshotLink() {
return snapshotLink;
}
/**
* Returns a string representation of the RemoteConsumer object.
*
* @return A string describing the internal state of the consumer.
*/
@Override
public String toString() {
return "RemoteConsumer{" +
"offset=" + offset +
", snapshotLink='" + snapshotLink + '\'' +
", context='" + context + '\'' +
", name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,39 @@
package com.wazuh.contentmanager.cti.catalog.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wazuh.contentmanager.cti.catalog.client.ApiClient;
/**
* Abstract service class, for generalization.
*/
public abstract class AbstractService {
ApiClient client;
final ObjectMapper mapper;
/**
* Default constructor
*/
public AbstractService() {
this.client = new ApiClient();
this.mapper = new ObjectMapper();
}
/**
* Use for testing only.
* @param c mocked client.
*/
public void setClient(ApiClient c) {
this.close();
this.client = c;
}
/**
* Closes the underlying HTTP client. Should be called when the service is no longer needed.
*/
public void close() {
if (this.client != null) {
this.client.close();
}
}
}

View File

@ -0,0 +1,24 @@
package com.wazuh.contentmanager.cti.catalog.service;
import com.wazuh.contentmanager.cti.catalog.model.LocalConsumer;
import com.wazuh.contentmanager.cti.catalog.model.RemoteConsumer;
/**
* Service interface for managing and retrieving CTI Catalog consumer states.
*/
public interface ConsumerService {
/**
* Retrieves the current local consumer state.
*
* @return The {@link LocalConsumer} object representing the local state.
*/
LocalConsumer getLocalConsumer();
/**
* Retrieves the current remote consumer state.
*
* @return The {@link RemoteConsumer} object representing the remote state.
*/
RemoteConsumer getRemoteConsumer();
}

View File

@ -0,0 +1,112 @@
package com.wazuh.contentmanager.cti.catalog.service;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.model.LocalConsumer;
import com.wazuh.contentmanager.cti.catalog.model.RemoteConsumer;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.core.rest.RestStatus;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
* Implementation of the ConsumerService.
* Manages the retrieval and persistence of Local and Remote consumer states using
* internal indices and the CTI API client.
*/
public class ConsumerServiceImpl extends AbstractService implements ConsumerService {
private static final Logger log = LogManager.getLogger(ConsumerServiceImpl.class);
// private static final String CONTEXT = "rules_development_0.0.1";
// private static final String CONSUMER = "rules_consumer";
private final String context;
private final String consumer;
private final ConsumersIndex consumerIndex;
/**
* Constructs a ConsumerServiceImpl.
*
* @param context The context identifier.
* @param consumer The consumer identifier.
* @param consumerIndex The index service for storing consumer metadata.
*/
public ConsumerServiceImpl(String context, String consumer, ConsumersIndex consumerIndex) {
this.context = context;
this.consumer = consumer;
this.consumerIndex = consumerIndex;
}
/**
* Retrieves the local consumer state from the internal index.
* If the consumer does not exist locally, it attempts to initialize it.
*
* @return The {@link LocalConsumer} object, or null if retrieval/parsing fails.
*/
@Override
public LocalConsumer getLocalConsumer() {
try {
GetResponse response = this.consumerIndex.getConsumer(this.context, this.consumer);
return response.isExists() ?
this.mapper.readValue(response.getSourceAsString(), LocalConsumer.class):
this.setConsumer();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Couldn't obtain consumer from internal index: {}", e.getMessage());
} catch (IOException e) {
log.error("Failed to parse local consumer: {}", e.getMessage());
}
return null;
}
/**
* Retrieves the remote consumer state from the CTI API.
*
* @return The {@link RemoteConsumer} object, or null if the API call fails.
*/
@Override
public RemoteConsumer getRemoteConsumer() {
try {
// Perform request
SimpleHttpResponse response = this.client.getConsumer(this.context, this.consumer);
if (response.getCode() == 200) {
return this.mapper.readValue(response.getBodyText(), RemoteConsumer.class);
}
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Couldn't obtain consumer from CTI: {}", e.getMessage());
} catch (IOException e) {
log.error("Failed to parse remote consumer: {}", e.getMessage());
}
return null;
}
/**
* Creates or updates the default local consumer state in the internal index.
*
* @return The initialized {@link LocalConsumer}, or null if persistence fails.
*/
public LocalConsumer setConsumer() {
// Default consumer. Initialize.
LocalConsumer consumer = new LocalConsumer(this.context, this.consumer);
try {
IndexResponse response = this.consumerIndex.setConsumer(consumer);
if (response.status() == RestStatus.CREATED || response.status() == RestStatus.OK) {
log.info("Local consumer with id [{}] created or updated", response.getId());
return consumer;
}
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Couldn't save consumer to internal index: {}", e.getMessage());
} catch (IOException e) {
log.error("Attempt to save invalid local consumer: {}", e.getMessage());
}
return null;
}
}

View File

@ -14,14 +14,14 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.updater;
package com.wazuh.contentmanager.cti.catalog.service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.settings.PluginSettings;

View File

@ -0,0 +1,17 @@
package com.wazuh.contentmanager.cti.catalog.service;
import com.wazuh.contentmanager.cti.catalog.model.RemoteConsumer;
/**
* Service interface for managing CTI snapshots.
* Defines the contract for initializing consumers from remote snapshots.
*/
public interface SnapshotService {
/**
* Initializes a consumer by processing its associated remote snapshot.
*
* @param consumer The remote consumer containing the snapshot link and offset information.
*/
void initialize(RemoteConsumer consumer);
}

View File

@ -0,0 +1,352 @@
/*
* Copyright (C) 2024, Wazuh Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.cti.catalog.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.wazuh.contentmanager.cti.catalog.client.SnapshotClient;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
import com.wazuh.contentmanager.cti.catalog.model.LocalConsumer;
import com.wazuh.contentmanager.cti.catalog.model.RemoteConsumer;
import com.wazuh.contentmanager.cti.catalog.utils.Unzip;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.env.Environment;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
/**
* Service responsible for handling the download, extraction, and indexing of CTI snapshots.
* It extracts the contents of the payload and indexes them at the root.
*/
public class SnapshotServiceImpl implements SnapshotService {
private static final Logger log = LogManager.getLogger(SnapshotServiceImpl.class);
// Mappers and Keys for YAML enrichment
private final ObjectMapper jsonMapper;
private final ObjectMapper yamlMapper;
private static final List<String> DECODER_ORDER_KEYS = Arrays.asList(
"name", "metadata", "parents", "definitions", "check",
"parse|event.original", "parse|message", "normalize"
);
// Keys to navigate the JSON structure
private static final String JSON_PAYLOAD_KEY = "payload";
private static final String JSON_TYPE_KEY = "type";
private static final String JSON_DOCUMENT_KEY = "document";
private static final String JSON_ID_KEY = "id";
private final String context;
private final String consumer;
private final List<ContentIndex> contentIndex;
private final ConsumersIndex consumersIndex;
private SnapshotClient snapshotClient;
private final Environment environment;
private final PluginSettings pluginSettings;
public SnapshotServiceImpl(String context,
String consumer,
List<ContentIndex> contentIndex,
ConsumersIndex consumersIndex,
Environment environment) {
this.context = context;
this.consumer = consumer;
this.contentIndex = contentIndex;
this.consumersIndex = consumersIndex;
this.environment = environment;
this.pluginSettings = PluginSettings.getInstance();
this.jsonMapper = new ObjectMapper();
this.yamlMapper = new ObjectMapper(new YAMLFactory());
this.snapshotClient = new SnapshotClient(this.environment);
}
/**
* Used for testing. Inject mocks.
*
* @param client
*/
public void setSnapshotClient(SnapshotClient client) {
this.snapshotClient = client;
}
/**
* Initializes the content by downloading the snapshot from the given link,
* unzipping it, and indexing the content into specific indices.
*
* @param consumer information from the remote consumer. Contains the snapshot link from which the initialization takes place.
*/
@Override
public void initialize(RemoteConsumer consumer) {
String snapshotUrl = consumer.getSnapshotLink();
long offset = consumer.getOffset();
if (snapshotUrl == null || snapshotUrl.isEmpty()) {
log.warn("Snapshot URL is empty. Skipping initialization.");
return;
}
log.info("Starting snapshot initialization for context [{}] consumer [{}]", this.context, this.consumer);
Path snapshotZip = null;
Path outputDir = null;
try {
// 1. Download Snapshot
snapshotZip = this.snapshotClient.downloadFile(snapshotUrl);
if (snapshotZip == null) {
log.error("Failed to download snapshot from {}", snapshotUrl);
return;
}
// 2. Prepare output directory
outputDir = this.environment.tmpDir().resolve("snapshot_" + System.currentTimeMillis());
Files.createDirectories(outputDir);
// 3. Unzip
Unzip.unzip(snapshotZip, outputDir);
// 4. Clear indices
this.contentIndex.forEach(ContentIndex::clear);
// 5. Process and Index Files
try (DirectoryStream<Path> stream = Files.newDirectoryStream(outputDir, "*.json")) {
for (Path entry : stream) {
this.processSnapshotFile(entry);
}
}
} catch (Exception e) {
log.error("Error processing snapshot: {}", e.getMessage());
} finally {
// Cleanup temporary files
this.cleanup(snapshotZip, outputDir);
}
// 6. Update Consumer State in .cti-consumers
try {
LocalConsumer updatedConsumer = new LocalConsumer(this.context, this.consumer, offset, offset, snapshotUrl);
this.consumersIndex.setConsumer(updatedConsumer);
} catch (Exception e) {
log.error("Failed to update consumer state in {}: {}", ConsumersIndex.INDEX_NAME, e.getMessage());
}
}
/**
* Reads a JSON snapshot file line by line, extracts the contents of the payload object,
* and indexes them directly at the root.
*
* @param filePath Path to the JSON file.
*/
private void processSnapshotFile(Path filePath) {
String line;
int docCount = 0;
BulkRequest bulkRequest = new BulkRequest();
try (BufferedReader reader = Files.newBufferedReader(filePath, StandardCharsets.UTF_8)) {
while ((line = reader.readLine()) != null) {
try {
JsonObject rootJson = JsonParser.parseString(line).getAsJsonObject();
// 1. Validate and Extract Payload
if (!rootJson.has(JSON_PAYLOAD_KEY)) {
log.warn("Snapshot entry missing '{}'. Skipping.", JSON_PAYLOAD_KEY);
continue;
}
JsonObject payload = rootJson.getAsJsonObject(JSON_PAYLOAD_KEY);
// 2. Determine Index from 'type' inside payload
if (!payload.has(JSON_TYPE_KEY)) {
log.warn("Payload missing '{}'. Skipping.", JSON_TYPE_KEY);
continue;
}
String type = payload.get(JSON_TYPE_KEY).getAsString();
// Skip policy type documents
if ("policy".equalsIgnoreCase(type)) {
log.debug("Skipping document with type {}.", type);
continue;
}
if ("decoder".equalsIgnoreCase(type)) {
this.enrichDecoderWithYaml(payload);
}
String indexName = this.getIndexName(type);
// 3. Extract the inner 'document' object for ID retrieval and Preprocessing
if (!payload.has(JSON_DOCUMENT_KEY)) {
log.warn("Payload missing '{}'. Skipping.", JSON_DOCUMENT_KEY);
continue;
}
JsonObject innerDocument = payload.getAsJsonObject(JSON_DOCUMENT_KEY);
// Preprocess the inner document
this.preprocessDocument(innerDocument);
// 4. Create Index Request
IndexRequest indexRequest = new IndexRequest(indexName)
.source(payload.toString(), XContentType.JSON);
if (innerDocument.has(JSON_ID_KEY)) {
indexRequest.id(innerDocument.get(JSON_ID_KEY).getAsString());
}
bulkRequest.add(indexRequest);
docCount++;
// Execute Bulk if limit reached
if (docCount >= this.pluginSettings.getMaxItemsPerBulk()) {
this.contentIndex.getFirst().executeBulk(bulkRequest);
bulkRequest = new BulkRequest();
docCount = 0;
}
} catch (Exception e) {
log.error("Error parsing/indexing JSON line: {}", e.getMessage());
}
}
// Index remaining documents
if (bulkRequest.numberOfActions() > 0) {
this.contentIndex.getFirst().executeBulk(bulkRequest);
}
} catch (IOException e) {
log.error("Error reading snapshot file [{}]: {}", filePath, e.getMessage());
}
}
/**
* Parses the decoder payload, reorders keys based on a predefined list,
* and adds a YAML representation of the decoder to the payload.
*
* @param payload The JSON object containing the decoder data.
*/
private void enrichDecoderWithYaml(JsonObject payload) {
try {
JsonNode docNode = this.jsonMapper.readTree(payload.toString()).get(JSON_DOCUMENT_KEY);
if (docNode != null && docNode.isObject()) {
Map<String, Object> orderedDecoderMap = new LinkedHashMap<>();
// Add JSON nodes in the expected order, if they exist.
for (String key : DECODER_ORDER_KEYS) {
if (docNode.has(key)) {
orderedDecoderMap.put(key, docNode.get(key));
}
}
// Add remaining JSON nodes.
Iterator<Map.Entry<String, JsonNode>> fields = docNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (!DECODER_ORDER_KEYS.contains(field.getKey())) {
orderedDecoderMap.put(field.getKey(), field.getValue());
}
}
// Add YAML representation to the document
String yamlContent = this.yamlMapper.writeValueAsString(orderedDecoderMap);
payload.addProperty("decoder", yamlContent);
}
} catch (IOException e) {
log.error("Failed to convert decoder payload to YAML: {}", e.getMessage(), e);
}
}
private String getIndexName(String type) {
return String.format(Locale.ROOT, ".%s-%s-%s", this.context, this.consumer, type);
}
/**
* Preprocesses the document to handle field transformations.
* Specifically, renames 'related.sigma_id' to 'related.id' to avoid StrictDynamicMappingException.
*
* @param document The document object to process.
*/
private void preprocessDocument(JsonObject document) {
if (!document.has("related")) {
return;
}
JsonElement relatedElement = document.get("related");
if (relatedElement.isJsonObject()) {
this.sanitizeRelatedObject(relatedElement.getAsJsonObject());
} else if (relatedElement.isJsonArray()) {
JsonArray relatedArray = relatedElement.getAsJsonArray();
for (JsonElement element : relatedArray) {
if (element.isJsonObject()) {
this.sanitizeRelatedObject(element.getAsJsonObject());
}
}
}
}
/**
* Helper method to perform the actual rename/delete logic on a specific related object.
*
* @param relatedObj The specific related object (either standalone or from an array).
*/
private void sanitizeRelatedObject(JsonObject relatedObj) {
if (relatedObj.has("sigma_id")) {
JsonElement sigmaIdValue = relatedObj.get("sigma_id");
// Move value to 'id'
relatedObj.add("id", sigmaIdValue);
// Remove the original 'sigma_id' field
relatedObj.remove("sigma_id");
}
}
/**
* Deletes temporary files and directories used during the process.
*/
private void cleanup(Path zipFile, Path directory) {
try {
if (zipFile != null) {
Files.deleteIfExists(zipFile);
}
if (directory != null) {
Files.walk(directory)
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
log.warn("Failed to delete temp file {}", path);
}
});
}
} catch (IOException e) {
log.warn("Error during cleanup: {}", e.getMessage());
}
}
}

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.utils;
package com.wazuh.contentmanager.cti.catalog.utils;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.utils;
package com.wazuh.contentmanager.cti.catalog.utils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

View File

@ -4,7 +4,15 @@ import com.wazuh.contentmanager.cti.console.model.Token;
import java.util.EventListener;
/**
* Listener interface for receiving notifications about Token changes.
*/
public interface TokenListener extends EventListener {
/**
* Invoked when the authentication token has changed (e.g., refreshed or initially acquired).
*
* @param token The new {@link Token}.
*/
void onTokenChanged(Token token);
}

View File

@ -1,7 +1,7 @@
package com.wazuh.contentmanager.cti.console.client;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.utils.http.HttpResponseCallback;
import com.wazuh.contentmanager.client.HttpResponseCallback;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;

View File

@ -1,7 +1,19 @@
package com.wazuh.contentmanager.cti.console.client;
/**
* Represents a closable HTTP client wrapper used within the CTI console context.
*/
public interface ClosableHttpClient {
/**
* Sets the underlying API client instance to be used by this implementation.
*
* @param c the {@link ApiClient} instance to assign to this closable client.
*/
void setClient(ApiClient c);
/**
* Closes this client and releases any system resources associated with it.
*/
void close();
}

View File

@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.util.List;
/**
* CTI plan DTO.
* Represents a CTI plan.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Plan {
@ -14,23 +14,42 @@ public class Plan {
private List<Product> products;
/**
* Default constructor.
* Default no-argument constructor.
*/
public Plan() {}
/**
* Retrieves the name of the plan.
*
* @return the name of the plan.
*/
public String getName() {
return this.name;
}
/**
* Retrieves the description of the plan.
*
* @return the plan description.
*/
public String getDescription() {
return this.description;
}
/**
* Retrieves the list of products associated with this plan.
*
* @return a {@link List} of {@link Product} objects, or {@code null} if none are set.
*/
public List<Product> getProducts() {
return this.products;
}
/**
* Returns a string representation of the Plan object.
*
* @return a string containing the name, description, and the string representation of the associated products.
*/
@Override
public String toString() {
return "Plan{" +

View File

@ -3,7 +3,7 @@ package com.wazuh.contentmanager.cti.console.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* CTI product DTO.
* Represents a specific Product within a CTI Plan.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Product {
@ -14,30 +14,60 @@ public class Product {
private String resource;
/**
* Default constructor.
* Default no-argument constructor.
*/
public Product() {}
/**
* Retrieves the unique identifier of the product.
*
* @return the product identifier string.
*/
public String getIdentifier() {
return this.identifier;
}
/**
* Retrieves the type or category of the product.
*
* @return the product type.
*/
public String getType() {
return this.type;
}
/**
* Retrieves the display name of the product.
*
* @return the product name.
*/
public String getName() {
return this.name;
}
/**
* Retrieves the description of the product.
*
* @return the product description.
*/
public String getDescription() {
return this.description;
}
/**
* Retrieves the resource location for this product.
*
* @return a string representing the resource URI or path.
*/
public String getResource() {
return this.resource;
}
/**
* Returns a string representation of the Product.
*
* @return a string containing the identifier, type, name, description, and resource.
*/
@Override
public String toString() {
return "Product{" +

View File

@ -5,10 +5,33 @@ import com.wazuh.contentmanager.cti.console.client.ClosableHttpClient;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.cti.console.model.Subscription;
/**
* Service interface for handling authentication with the CTI Console.
* Manages the retrieval of permanent tokens and resource-specific tokens.
*/
public interface AuthService extends ClosableHttpClient {
/**
* Retrieves a permanent authentication token based on the provided subscription details.
*
* @param subscription The subscription details containing client ID and device code.
* @return The permanent {@link Token}, or null if retrieval fails.
*/
Token getToken(Subscription subscription);
/**
* Exchanges a permanent token for a resource-specific token.
*
* @param token The permanent authentication token.
* @param resource The identifier of the resource to access.
* @return A resource-specific {@link Token}, or null if retrieval fails.
*/
Token getResourceToken(Token token, String resource);
/**
* Registers a listener to receive updates when the token changes.
*
* @param listener The {@link TokenListener} to add.
*/
void addListener(TokenListener listener);
}

View File

@ -6,7 +6,16 @@ import com.wazuh.contentmanager.cti.console.model.Token;
import java.util.List;
/**
* Service interface definition for managing CTI Plans.
*/
public interface PlansService extends ClosableHttpClient {
/**
* Retrieves the list of available CTI plans authorized by the provided token.
*
* @param token the authentication {@link Token} required to validate the request.
* @return a {@link List} of {@link Plan} objects available to the user. Returns an empty list if no plans are found.
*/
List<Plan> getPlans(Token token);
}

View File

@ -1,324 +0,0 @@
/*
* Copyright (C) 2024, Wazuh Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.index;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.transport.client.Client;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.mapper.StrictDynamicMappingException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.Offset;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.JsonPatch;
import com.wazuh.contentmanager.utils.XContentUtils;
/** Manages operations for a content index. */
public class ContentIndex {
private static final String JSON_NAME_KEY = "name";
private static final String JSON_OFFSET_KEY = "offset";
private static final Logger log = LogManager.getLogger(ContentIndex.class);
/** Content index name. */
public static final String INDEX_NAME = "wazuh-ruleset";
private final Client client;
private final PluginSettings pluginSettings;
private final Semaphore semaphore;
/**
* Constructor for the ContentIndex class.
*
* @param client the OpenSearch Client to interact with the cluster
*/
public ContentIndex(Client client) {
this.pluginSettings = PluginSettings.getInstance();
this.semaphore = new Semaphore(pluginSettings.getMaximumConcurrentBulks());
this.client = client;
}
/**
* This constructor is only used on tests.
*
* @param client Client (mocked).
* @param pluginSettings PluginSettings (mocked).
*/
public ContentIndex(Client client, PluginSettings pluginSettings) {
this.pluginSettings = pluginSettings;
this.semaphore = new Semaphore(pluginSettings.getMaximumConcurrentBulks());
this.client = client;
}
/**
* Searches for an element in the {@link ContentIndex#INDEX_NAME} by its ID.
*
* @param resourceId the ID of the element to retrieve.
* @return the element as a JsonObject instance, or null.
* @throws InterruptedException if the operation is interrupted.
* @throws ExecutionException if an error occurs during execution.
* @throws TimeoutException if the operation times out.
* @throws IllegalArgumentException if the content is not found.
*/
public JsonObject getById(String resourceId)
throws InterruptedException, ExecutionException, TimeoutException, IllegalArgumentException {
GetResponse response =
this.client
.get(new GetRequest(ContentIndex.INDEX_NAME, resourceId))
.get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
if (response.isExists()) {
return JsonParser.parseString(response.getSourceAsString()).getAsJsonObject();
}
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Document with ID [%s] not found in the [%s] index",
resourceId,
ContentIndex.INDEX_NAME));
}
/**
* Indexes a single Offset document.
*
* @param document {@link Offset} document to index.
* @throws StrictDynamicMappingException index operation failed because the document does not
* match the index mappings.
* @throws ExecutionException index operation failed to execute.
* @throws InterruptedException index operation was interrupted.
* @throws TimeoutException index operation timed out.
* @throws IOException operation failed caused by the creation of the JSON builder by the
* XContentFactory.
*/
public void index(Offset document)
throws StrictDynamicMappingException,
ExecutionException,
InterruptedException,
TimeoutException,
IOException {
IndexRequest indexRequest =
new IndexRequest()
.index(ContentIndex.INDEX_NAME)
.source(document.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(document.getResource());
this.client.index(indexRequest).get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
}
/**
* Indexes a list of JSON documents in bulk.
*
* @param documents list of JSON documents to be indexed.
*/
public void index(List<JsonObject> documents) {
BulkRequest bulkRequest = new BulkRequest(ContentIndex.INDEX_NAME);
for (JsonObject document : documents) {
bulkRequest.add(
new IndexRequest()
.id(document.get(ContentIndex.JSON_NAME_KEY).getAsString())
.source(document.toString(), XContentType.JSON));
}
this.client.bulk(
bulkRequest,
new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
semaphore.release();
if (bulkResponse.hasFailures()) {
log.error("Bulk index operation failed: {}", bulkResponse.buildFailureMessage());
} else {
log.debug("Bulk index operation succeeded in {} ms", bulkResponse.getTook().millis());
}
}
@Override
public void onFailure(Exception e) {
semaphore.release();
log.error("Bulk index operation failed: {}", e.getMessage(), e);
}
});
}
/**
* Deletes a document from the index.
*
* @param id ID of the document to delete.
*/
public void delete(String id) {
this.client.delete(
new DeleteRequest(ContentIndex.INDEX_NAME, id),
new ActionListener<>() {
@Override
public void onResponse(DeleteResponse response) {
log.info("Deleted CTI Catalog Content {} from index", id);
}
@Override
public void onFailure(Exception e) {
log.error("Failed to delete CTI Catalog Content {}: {}", id, e.getMessage(), e);
}
});
}
/**
* Initializes the index from a local snapshot. The snapshot file (in NDJSON format) is split in
* chunks of {@link PluginSettings#MAX_ITEMS_PER_BULK} elements. These are bulk indexed using
* {@link ContentIndex#index(List)}.
*
* @param path path to the CTI snapshot JSON file to be indexed.
* @return offset number of the last indexed resource of the snapshot. 0 on error.
*/
public long fromSnapshot(String path) {
long startTime = System.currentTimeMillis();
String line;
JsonObject json;
int lineCount = 0;
ArrayList<JsonObject> items = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(path, StandardCharsets.UTF_8))) {
while ((line = reader.readLine()) != null) {
json = JsonParser.parseString(line).getAsJsonObject();
items.add(json);
lineCount++;
// Index items (MAX_DOCUMENTS reached)
if (lineCount == this.pluginSettings.getMaxItemsPerBulk()) {
this.semaphore.acquire();
this.index(items);
lineCount = 0;
items.clear();
}
}
// Index remaining items (> MAX_DOCUMENTS)
if (lineCount > 0) {
this.semaphore.acquire();
this.index(items);
}
} catch (InterruptedException e) {
items.clear();
log.error("Processing snapshot file interrupted {}", e.getMessage());
} catch (Exception e) {
items.clear();
log.error("Generic exception indexing the snapshot: {}", e.getMessage());
}
long estimatedTime = System.currentTimeMillis() - startTime;
log.info("Snapshot indexing finished successfully in {} ms", estimatedTime);
return items.isEmpty()
? 0
: items.get(items.size() - 1).get(ContentIndex.JSON_OFFSET_KEY).getAsLong();
}
/**
* Applies a set of changes (create, update, delete) to the content index.
*
* @param changes content changes to apply.
*/
public void patch(Changes changes) {
ArrayList<Offset> offsets = changes.get();
if (offsets.isEmpty()) {
log.info("No changes to apply");
return;
}
log.info(
"Patching [{}] from offset [{}] to [{}]",
ContentIndex.INDEX_NAME,
changes.getFirst().getOffset(),
changes.getLast().getOffset());
for (Offset change : offsets) {
String id = change.getResource();
try {
log.debug("Processing offset [{}]", change.getOffset());
switch (change.getType()) {
case CREATE:
log.debug("Creating new resource with ID [{}]", id);
this.index(change);
break;
case UPDATE:
log.debug("Updating resource with ID [{}]", id);
JsonObject content = this.getById(id);
for (Operation op : change.getOperations()) {
JsonPatch.applyOperation(content, XContentUtils.xContentObjectToJson(op));
}
try (XContentParser parser = XContentUtils.createJSONParser(content)) {
this.index(Offset.parse(parser));
}
break;
case DELETE:
log.debug("Deleting resource with ID [{}]", id);
this.delete(id);
break;
default:
throw new IllegalArgumentException("Unknown change type: " + change.getType());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while patching", e);
} catch (Exception e) {
log.error("Failed to patch [{}] due to {}", id, e.getMessage());
throw new RuntimeException("Patch operation failed", e);
}
}
}
/** Clears all documents from the {@link ContentIndex#INDEX_NAME} index. */
public void clear() {
try {
DeleteByQueryRequestBuilder deleteByQuery =
new DeleteByQueryRequestBuilder(this.client, DeleteByQueryAction.INSTANCE);
deleteByQuery.source(ContentIndex.INDEX_NAME).filter(QueryBuilders.matchAllQuery());
BulkByScrollResponse response = deleteByQuery.get();
log.debug(
"[{}] wiped. {} documents were removed", ContentIndex.INDEX_NAME, response.getDeleted());
} catch (OpenSearchTimeoutException e) {
log.error("[{}] delete query timed out: {}", ContentIndex.INDEX_NAME, e.getMessage());
}
}
}

View File

@ -39,6 +39,11 @@ public class ContentJobParameter implements ScheduledJobParameter {
this.enabledTime = enabledTime;
}
/**
* Gets the job type identifier.
*
* @return The job type string (e.g., "hello-world-task").
*/
public String getJobType() {
return this.jobType;
}

View File

@ -1,6 +1,5 @@
package com.wazuh.contentmanager.jobscheduler;
import com.wazuh.contentmanager.jobscheduler.jobs.HelloWorldJob;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
@ -29,7 +28,6 @@ public class ContentJobRunner implements ScheduledJobRunner {
public static synchronized ContentJobRunner getInstance() {
if (INSTANCE == null) {
INSTANCE = new ContentJobRunner();
INSTANCE.init();
}
return INSTANCE;
}
@ -40,7 +38,6 @@ public class ContentJobRunner implements ScheduledJobRunner {
* Initialize default jobs.
*/
private void init() {
this.registerExecutor(HelloWorldJob.JOB_TYPE, new HelloWorldJob());
}
/**

View File

@ -0,0 +1,196 @@
package com.wazuh.contentmanager.jobscheduler.jobs;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
import com.wazuh.contentmanager.cti.catalog.model.LocalConsumer;
import com.wazuh.contentmanager.cti.catalog.model.RemoteConsumer;
import com.wazuh.contentmanager.cti.catalog.service.ConsumerService;
import com.wazuh.contentmanager.cti.catalog.service.ConsumerServiceImpl;
import com.wazuh.contentmanager.cti.catalog.service.SnapshotServiceImpl;
import com.wazuh.contentmanager.jobscheduler.JobExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.env.Environment;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
* Job responsible for executing the synchronization logic for Rules and Decoders consumers.
*/
public class CatalogSyncJob implements JobExecutor {
private static final Logger log = LogManager.getLogger(CatalogSyncJob.class);
// Identifier used to route this specific job type
public static final String JOB_TYPE = "consumer-sync-task";
private final Client client;
private final ConsumersIndex consumersIndex;
private final Environment environment;
private final ThreadPool threadPool;
/**
* Constructs a new CatalogSyncJob.
*
* @param client The OpenSearch client used for administrative index operations (create/check).
* @param consumersIndex The wrapper for accessing and managing the internal Consumers index.
* @param environment The OpenSearch environment settings, used for path resolution.
* @param threadPool The thread pool manager, used to offload blocking tasks to the generic executor.
*/
public CatalogSyncJob(Client client, ConsumersIndex consumersIndex, Environment environment, ThreadPool threadPool) {
this.client = client;
this.consumersIndex = consumersIndex;
this.environment = environment;
this.threadPool = threadPool;
}
/**
* Triggers the execution of the synchronization job.
*
* @param context The execution context provided by the Job Scheduler, containing metadata like the Job ID.
*/
@Override
public void execute(JobExecutionContext context) {
// Offload execution to the generic thread pool to allow blocking operations
this.threadPool.generic().execute(() -> {
try {
log.info("Executing Consumer Sync Job (ID: {})", context.getJobId());
this.rulesConsumer();
this.decodersConsumer();
} catch (Exception e) {
log.error("Error executing Consumer Sync Job (ID: {}): {}", context.getJobId(), e.getMessage(), e);
}
});
}
/**
* Orchestrates the synchronization process specifically for the Rules consumer.
*/
private void rulesConsumer() {
String context = "rules_development_0.0.1";
String consumer = "rules_development_0.0.1_test";
Map<String, String> mappings = new HashMap<>();
mappings.put(
"rule", "/mappings/cti-rules-mappings.json"
);
mappings.put(
"integration", "/mappings/cti-rules-integrations-mappings.json"
);
Map<String, String> aliases = new HashMap<>();
aliases.put("rule", ".cti-rules");
aliases.put("integration", ".cti-integration-rules");
this.syncConsumerServices(context, consumer, mappings, aliases);
log.info("Rules Consumer correctly synchronized.");
}
/**
* Orchestrates the synchronization process specifically for the Decoders consumer.
*/
private void decodersConsumer() {
String context = "decoders_development_0.0.1";
String consumer = "decoders_development_0.0.1";
Map<String, String> mappings = new HashMap<>();
mappings.put(
"decoder", "/mappings/cti-decoders-mappings.json"
);
mappings.put(
"kvdb", "/mappings/cti-kvdbs-mappings.json"
);
mappings.put(
"integration", "/mappings/cti-decoders-integrations-mappings.json"
);
Map<String, String> aliases = new HashMap<>();
aliases.put("decoder", ".cti-decoders");
aliases.put("kvdb", ".cti-kvdbs");
aliases.put("integration", ".cti-integration-decoders");
this.syncConsumerServices(context, consumer, mappings, aliases);
log.info("Decoders Consumer correctly synchronized.");
}
/**
* Generates a standardized OpenSearch index name based on the provided parameters.
*
* @param context The context identifier (e.g., version info).
* @param consumer The consumer identifier.
* @param type The specific content type (e.g., "rule", "decoder").
* @return A formatted string representing the system index name.
*/
private String getIndexName(String context, String consumer, String type) {
return String.format(
Locale.ROOT, ".%s-%s-%s",
context,
consumer,
type
);
}
/**
* The core logic for synchronizing consumer services.
*
* This method performs the following actions:
* 1. Retrieve the Local and Remote consumer metadata.
* 2. Iterate through the requested mappings to check if indices exist.
* 3. Create indices using the provided mapping files if they are missing.
* 4. Compare local offsets with remote offsets to determine if a Snapshot initialization is required.
* 5. Triggers a full snapshot download if the local consumer is new or empty.
* 6. Triggers the update process if the offsets from local and remote consumers differ.
*
* @param context The versioned context string.
* @param consumer The specific consumer identifier.
* @param mappings A map associating content types to their JSON mapping file paths.
* @param aliases A map associating content types to their OpenSearch alias names.
*/
private void syncConsumerServices(String context, String consumer, Map<String, String> mappings, Map<String, String> aliases) {
ConsumerService consumerService = new ConsumerServiceImpl(context, consumer, this.consumersIndex);
LocalConsumer localConsumer = consumerService.getLocalConsumer();
RemoteConsumer remoteConsumer = consumerService.getRemoteConsumer();
List<ContentIndex> indices = new ArrayList<>();
for (Map.Entry<String, String> entry : mappings.entrySet()) {
String indexName = this.getIndexName(context, consumer, entry.getKey());
String alias = aliases.get(entry.getKey());
ContentIndex index = new ContentIndex(this.client, indexName, entry.getValue(), alias);
indices.add(index);
// Check if index exists to avoid creation exception
boolean indexExists = this.client.admin().indices().prepareExists(indexName).get().isExists();
if (!indexExists) {
try {
CreateIndexResponse response = index.createIndex();
if (response.isAcknowledged()) {
log.info("Index [{}] created successfully", response.index());
}
} catch (Exception e) {
log.error("Failed to create index [{}]: {}", indexName, e.getMessage());
}
}
}
if (remoteConsumer != null && remoteConsumer.getSnapshotLink() != null && (localConsumer == null || localConsumer.getLocalOffset() == 0)) {
log.info("Initializing snapshot from link: {}", remoteConsumer.getSnapshotLink());
SnapshotServiceImpl snapshotService = new SnapshotServiceImpl(
context,
consumer,
indices,
this.consumersIndex,
this.environment
);
snapshotService.initialize(remoteConsumer);
} else if (remoteConsumer != null && localConsumer.getLocalOffset() != remoteConsumer.getOffset()) {
// TODO: Implement offset based update process
}
}
}

View File

@ -1,31 +0,0 @@
package com.wazuh.contentmanager.jobscheduler.jobs;
import com.wazuh.contentmanager.jobscheduler.JobExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import java.time.Instant;
/**
* A sample implementation of a concrete Job.
* This specific job simply prints a "Hello World" message to the logs.
*/
public class HelloWorldJob implements JobExecutor {
private static final Logger log = LogManager.getLogger(HelloWorldJob.class);
// Identifier used to route this specific job type
public static final String JOB_TYPE = "hello-world-task";
/**
* Executes the logic for the Hello World job.
* * @param context The execution context provided by the Job Scheduler.
*/
@Override
public void execute(JobExecutionContext context) {
log.info("************************************************");
log.info("* Hello World! - Executing Job ID: {}", context.getJobId());
log.info("* Time: {}", Instant.now());
log.info("************************************************");
}
}

View File

@ -32,9 +32,9 @@ public class PluginSettings {
private static final Logger log = LogManager.getLogger(PluginSettings.class);
/** Settings default values */
private static final String DEFAULT_CONSUMER_ID = "vd_4.8.0";
private static final String DEFAULT_CONSUMER_ID = "rules_consumer";
private static final String DEFAULT_CONTEXT_ID = "vd_1.0.0";
private static final String DEFAULT_CONTEXT_ID = "rules_development_0.0.1";
private static final int DEFAULT_CTI_MAX_ATTEMPTS = 3;
private static final int DEFAULT_CTI_SLEEP_TIME = 60;
private static final int DEFAULT_MAX_ITEMS_PER_BULK = 25;
@ -48,7 +48,8 @@ public class PluginSettings {
private static PluginSettings INSTANCE;
/** Base Wazuh CTI URL */
public static final String CTI_URL = "https://cti.wazuh.com";
// https://cti-pre.wazuh.com/api/v1/catalog/contexts/rules_development_0.0.1/consumers/rules_consumer
public static final String CTI_URL = "https://cti-pre.wazuh.com";
/** The CTI API URL from the configuration file */
// TODO: Change to the new CTI_API_URL

View File

@ -27,7 +27,7 @@ import org.opensearch.common.settings.Settings;
import java.util.Locale;
/**
* ClusterInfoHelper provides utility methods for retrieving cluster-related information, such as
* ClusterInfo provides utility methods for retrieving cluster-related information, such as
* security settings and the cluster base URL.
*/
public class ClusterInfo {
@ -42,7 +42,7 @@ public class ClusterInfo {
// Check if security plugins have HTTPS enabled
return settings.getAsBoolean("plugins.security.ssl.http.enabled", false)
|| settings.getAsBoolean("xpack.security.http.ssl.enabled", false);
|| settings.getAsBoolean("xpack.security.http.ssl.enabled", false);
}
/**
@ -71,7 +71,7 @@ public class ClusterInfo {
*/
public static boolean indexStatusCheck(Client client, String index) {
ClusterHealthResponse response =
client.admin().cluster().prepareHealth().setIndices(index).setWaitForYellowStatus().get();
client.admin().cluster().prepareHealth().setIndices(index).setWaitForYellowStatus().get();
return response.getStatus() != ClusterHealthStatus.RED;
}

View File

@ -1,221 +0,0 @@
/*
* Copyright (C) 2024, Wazuh Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.utils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.env.Environment;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Locale;
import java.util.concurrent.Semaphore;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.updater.ContentUpdater;
/** Helper class to handle indexing of snapshots */
public class SnapshotManager {
private static final Logger log = LogManager.getLogger(SnapshotManager.class);
private final CTIClient ctiClient;
private final Environment environment;
private final ConsumersIndex consumersIndex;
private final ContentIndex contentIndex;
private final Privileged privileged;
private final Semaphore semaphore = new Semaphore(1);
private final PluginSettings pluginSettings;
/**
* Constructor.
*
* @param environment Needed for snapshot file handling.
* @param consumersIndex Handles context and consumer related metadata.
* @param contentIndex Handles indexed content.
*/
public SnapshotManager(
Environment environment,
ConsumersIndex consumersIndex,
ContentIndex contentIndex,
Privileged privileged) {
this.environment = environment;
this.consumersIndex = consumersIndex;
this.contentIndex = contentIndex;
this.privileged = privileged;
this.ctiClient = privileged.doPrivilegedRequest(CTIClient::getInstance);
this.pluginSettings = PluginSettings.getInstance();
}
/**
* Alternate constructor that allows injecting CTIClient for test purposes. Dependency injection.
*
* @param ctiClient Instance of CTIClient.
* @param environment Needed for snapshot file handling.
* @param consumersIndex Handles context and consumer related metadata.
* @param contentIndex Handles indexed content.
*/
@VisibleForTesting
protected SnapshotManager(
CTIClient ctiClient,
Environment environment,
ConsumersIndex consumersIndex,
ContentIndex contentIndex,
Privileged privileged,
PluginSettings pluginSettings) {
this.ctiClient = ctiClient;
this.environment = environment;
this.consumersIndex = consumersIndex;
this.contentIndex = contentIndex;
this.privileged = privileged;
this.pluginSettings = pluginSettings;
}
/**
* Initializes the content if {@code offset == 0}. This method downloads, decompresses and indexes
* a CTI snapshot.
*/
protected void indexSnapshot(ConsumerInfo consumerInfo) {
if (consumerInfo.getOffset() == 0) {
log.info("Initializing [{}] index from a snapshot", ContentIndex.INDEX_NAME);
// Clears the content of the index
this.contentIndex.clear();
this.privileged.doPrivilegedRequest(
() -> {
// Download snapshot.
Path snapshotZip =
this.ctiClient.download(consumerInfo.getLastSnapshotLink(), this.environment);
Path outputDir = this.environment.tmpDir();
try (DirectoryStream<Path> stream = this.getStream(outputDir)) {
// Unzip snapshot.
this.unzip(snapshotZip, outputDir);
Path snapshotJson = stream.iterator().next();
// Index snapshot.
long offset = this.contentIndex.fromSnapshot(snapshotJson.toString());
// Update the offset.
consumerInfo.setOffset(offset);
this.consumersIndex.index(consumerInfo);
// Remove snapshot.
Files.deleteIfExists(snapshotZip);
Files.deleteIfExists(snapshotJson);
} catch (IOException | NullPointerException e) {
log.error("Failed to index snapshot: {}", e.getMessage());
}
return null;
});
}
}
/**
* Wrapper method to handle unzipping files
*
* @param snapshotZip The Path to the zip file
* @param outputDir The output directory to extract files to
* @throws IOException Risen from unzip()
*/
protected void unzip(Path snapshotZip, Path outputDir) throws IOException {
Unzip.unzip(snapshotZip, outputDir);
}
/**
* Wrapper method to make newDirectoryStream() stubbable
*
* @param outputDir The output directory
* @return A DirectoryStream Path
* @throws IOException rethrown from newDirectoryStream()
*/
protected DirectoryStream<Path> getStream(Path outputDir) throws IOException {
return Files.newDirectoryStream(
outputDir,
String.format(
Locale.ROOT,
"%s_%s_*.json",
this.pluginSettings.getContextId(),
this.pluginSettings.getConsumerId()));
}
/**
* Updates the context index with data from the CTI API
*
* @throws IOException thrown when indexing failed
*/
protected ConsumerInfo initConsumer() throws IOException {
ConsumerInfo current =
this.consumersIndex.get(
this.pluginSettings.getContextId(), this.pluginSettings.getConsumerId());
ConsumerInfo latest = this.ctiClient.getConsumerInfo();
log.debug("Current consumer info: {}", current);
log.debug("Latest consumer info: {}", latest);
// Consumer is not yet initialized. Initialize to latest.
if (current == null || current.getOffset() == 0) {
log.debug("Initializing consumer: {}", latest);
if (this.consumersIndex.index(latest)) {
log.info(
"Successfully initialized consumer [{}][{}]", latest.getContext(), latest.getName());
} else {
throw new IOException(
String.format(
Locale.ROOT,
"Failed to initialize consumer [%s][%s]",
latest.getContext(),
latest.getName()));
}
current = latest;
}
// Consumer is initialized and up-to-date.
else if (current.getOffset() == latest.getLastOffset()) {
log.info(
"Consumer is up-to-date (offset {} == {}). Skipping...",
current.getOffset(),
latest.getLastOffset());
}
// Consumer is initialized but out-of-date.
else {
log.info("Consumer already initialized (offset {} != 0). Skipping...", current.getOffset());
current.setLastOffset(latest.getLastOffset());
current.setLastSnapshotLink(latest.getLastSnapshotLink());
this.consumersIndex.index(current);
// Start content update.
ContentUpdater updater =
new ContentUpdater(
this.ctiClient,
this.consumersIndex,
this.contentIndex,
this.privileged);
updater.update();
}
return current;
}
/** Trigger method for content initialization */
public void initialize() {
try {
this.semaphore.acquire();
ConsumerInfo consumerInfo = this.initConsumer();
this.indexSnapshot(consumerInfo);
semaphore.release();
} catch (IOException | InterruptedException e) {
log.error("Failed to initialize: {}", e.getMessage());
}
}
}

View File

@ -1,8 +1,19 @@
{
"properties": {
"name": { "type": "keyword" },
"local_offset": { "type": "long" },
"remote_offset": { "type": "long" },
"snapshot_link": { "type": "keyword" }
"context": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"local_offset": {
"type": "long"
},
"remote_offset": {
"type": "long"
},
"snapshot_link": {
"type": "keyword"
}
}
}

View File

@ -0,0 +1,46 @@
{
"dynamic": "strict",
"properties": {
"type": {
"type": "keyword"
},
"document": {
"properties": {
"id": {
"type": "keyword"
},
"title": {
"type": "keyword"
},
"category": {
"type": "keyword"
},
"description": {
"type": "text"
},
"author": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd"
},
"documentation": {
"type": "keyword"
},
"enable_decoders": {
"type": "boolean"
},
"references": {
"type": "keyword"
},
"decoders": {
"type": "keyword"
},
"kvdbs": {
"type": "keyword"
}
}
}
}
}

View File

@ -0,0 +1,99 @@
{
"dynamic": "strict_allow_templates",
"dynamic_templates": [
{
"parse_fields": {
"match": "parse|*",
"mapping": {
"type": "keyword"
}
}
}
],
"properties": {
"type": {
"type": "keyword"
},
"integration_id": {
"type": "keyword"
},
"decoder": {
"type": "keyword"
},
"document": {
"properties": {
"name": {
"type": "keyword"
},
"enabled": {
"type": "boolean"
},
"id": {
"type": "keyword"
},
"metadata": {
"type": "object",
"properties": {
"module": {
"type": "keyword"
},
"title": {
"type": "text"
},
"description": {
"type": "text"
},
"compatibility": {
"type": "keyword"
},
"versions": {
"type": "keyword"
},
"author": {
"type": "object",
"properties": {
"name": {
"type": "keyword"
},
"email": {
"type": "keyword"
},
"url": {
"type": "keyword"
},
"date": {
"type": "keyword"
}
}
},
"references": {
"type": "keyword"
}
}
},
"definitions": {
"type": "object",
"enabled": false
},
"check": {
"type": "object",
"enabled": false
},
"map": {
"type": "object",
"enabled": false
},
"parents": {
"type": "keyword"
},
"normalize": {
"type": "object",
"enabled": false
},
"parse|event.original": {
"type": "keyword"
}
}
}
}
}

View File

@ -0,0 +1,38 @@
{
"dynamic": "strict",
"properties": {
"type": {
"type": "keyword"
},
"integration_id": {
"type": "keyword"
},
"document": {
"properties": {
"title": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ss'Z'||yyyy-MM-dd'T'HH:mm:ss||epoch_millis"
},
"author": {
"type": "keyword"
},
"enabled": {
"type": "boolean"
},
"references": {
"type": "keyword"
},
"content": {
"type": "object",
"enabled": false
}
}
}
}
}

View File

@ -0,0 +1,41 @@
{
"dynamic": "strict",
"properties": {
"type": {
"type": "keyword"
},
"document": {
"type": "object",
"properties": {
"id": {
"type": "keyword"
},
"title": {
"type": "keyword"
},
"description": {
"type": "text"
},
"author": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd"
},
"documentation": {
"type": "keyword"
},
"enable_rules": {
"type": "boolean"
},
"references": {
"type": "keyword"
},
"rules": {
"type": "keyword"
}
}
}
}
}

View File

@ -0,0 +1,121 @@
{
"dynamic": "strict",
"dynamic_templates": [
{
"detection_fields": {
"path_match": "document.detection.*",
"path_unmatch": "document.detection.condition",
"mapping": {
"type": "object",
"enabled": false
}
}
}
],
"properties": {
"integration_id": {
"type": "keyword"
},
"type": {
"type": "keyword"
},
"document": {
"type": "object",
"properties": {
"id": {
"type": "keyword"
},
"sigma_id": {
"type": "keyword"
},
"title": {
"type": "keyword"
},
"description": {
"type": "text"
},
"author": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd"
},
"modified": {
"type": "date",
"format": "yyyy-MM-dd"
},
"status": {
"type": "keyword"
},
"level": {
"type": "keyword"
},
"enabled": {
"type": "boolean"
},
"falsepositives": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"references": {
"type": "keyword"
},
"logsource": {
"type": "object",
"properties": {
"category": {
"type": "keyword"
},
"product": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"definition": {
"type": "text"
}
}
},
"detection": {
"type": "object",
"dynamic": true,
"properties": {
"condition": {
"type": "keyword"
}
}
},
"related": {
"type": "nested",
"properties": {
"type": {
"type": "keyword"
},
"id": {
"type": "keyword"
}
}
},
"fields": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"taxonomy": {
"type": "keyword"
},
"license": {
"type": "keyword"
},
"scope": {
"type": "keyword"
}
}
}
}
}

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.index;
package com.wazuh.contentmanager.cti.catalog.index;
import com.google.gson.JsonObject;
import org.opensearch.action.get.GetResponse;

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.updater;
package com.wazuh.contentmanager.cti.catalog.service;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
@ -22,6 +22,7 @@ import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.Offset;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.transport.client.Client;
@ -46,8 +47,8 @@ import java.util.concurrent.TimeoutException;
import com.wazuh.contentmanager.ContentManagerPlugin;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import org.mockito.InjectMocks;
@ -56,6 +57,7 @@ import org.mockito.Mockito;
import static org.mockito.Mockito.*;
@AwaitsFix(bugUrl = "https://github.com/wazuh/wazuh-indexer/issues/1250")
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class ContentUpdaterIT extends OpenSearchIntegTestCase {

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.updater;
package com.wazuh.contentmanager.cti.catalog.service;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
@ -30,8 +30,8 @@ import java.util.ArrayList;
import java.util.List;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import org.mockito.InjectMocks;

View File

@ -0,0 +1,360 @@
/*
* Copyright (C) 2024, Wazuh Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.cti.catalog.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wazuh.contentmanager.cti.catalog.client.SnapshotClient;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
import com.wazuh.contentmanager.cti.catalog.model.LocalConsumer;
import com.wazuh.contentmanager.cti.catalog.model.RemoteConsumer;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
public class SnapshotServiceImplTests extends OpenSearchTestCase {
private SnapshotServiceImpl snapshotService;
private Path tempDir;
@Mock private SnapshotClient snapshotClient;
@Mock private ConsumersIndex consumersIndex;
@Mock private ContentIndex contentIndexMock;
@Mock private Environment environment;
@Mock private ClusterService clusterService;
@Mock private RemoteConsumer remoteConsumer;
private AutoCloseable closeable;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.closeable = MockitoAnnotations.openMocks(this);
this.tempDir = OpenSearchTestCase.createTempDir();
// Setup Environment mock to return our temp dir
Settings settings = Settings.builder()
.put("path.home", this.tempDir.toString())
.build();
when(this.environment.tmpDir()).thenReturn(this.tempDir);
when(this.environment.settings()).thenReturn(settings);
PluginSettings.getInstance(settings, this.clusterService);
List<ContentIndex> contentIndices = Collections.singletonList(this.contentIndexMock);
String context = "test-context";
String consumer = "test-consumer";
this.snapshotService = new SnapshotServiceImpl(context, consumer, contentIndices, consumersIndex, environment);
this.snapshotService.setSnapshotClient(this.snapshotClient);
}
@After
@Override
public void tearDown() throws Exception {
if (this.closeable != null) {
this.closeable.close();
}
super.tearDown();
}
/**
* Tests that the initialization aborts gracefully if the snapshot URL is missing.
*/
public void testInitialize_EmptyUrl() throws IOException, URISyntaxException {
when(remoteConsumer.getSnapshotLink()).thenReturn("");
this.snapshotService.initialize(remoteConsumer);
verify(snapshotClient, never()).downloadFile(anyString());
verify(contentIndexMock, never()).clear();
}
/**
* Tests that the initialization aborts if the download fails (returns null).
*/
public void testInitialize_DownloadFails() throws IOException, URISyntaxException {
String url = "http://example.com/snapshot.zip";
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
when(snapshotClient.downloadFile(url)).thenReturn(null);
this.snapshotService.initialize(remoteConsumer);
verify(snapshotClient).downloadFile(url);
verify(contentIndexMock, never()).clear();
}
/**
* Tests a successful initialization flow:
* 1. Download succeeds.
* 2. Unzip succeeds.
* 3. Files are parsed and indexed.
* 4. Consumer index is updated (check using the local consumer)
*/
public void testInitialize_Success() throws IOException, ExecutionException, InterruptedException, TimeoutException, URISyntaxException {
// Mock
String url = "http://example.com/snapshot.zip";
long offset = 100L;
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
when(remoteConsumer.getOffset()).thenReturn(offset);
Path zipPath = createZipFileWithContent("data.json",
"{\"payload\": {\"type\": \"kvdb\", \"document\": {\"id\": \"12345678\", \"title\": \"Test Kvdb\"}}}"
);
when(snapshotClient.downloadFile(url)).thenReturn(zipPath);
// Act
this.snapshotService.initialize(remoteConsumer);
// Assert
verify(contentIndexMock).clear();
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(contentIndexMock, atLeastOnce()).executeBulk(bulkCaptor.capture());
BulkRequest request = bulkCaptor.getValue();
assertEquals(1, request.numberOfActions());
IndexRequest indexRequest = (IndexRequest) request.requests().getFirst();
assertEquals(".test-context-test-consumer-kvdb", indexRequest.index());
assertEquals("12345678", indexRequest.id());
ArgumentCaptor<LocalConsumer> consumerCaptor = ArgumentCaptor.forClass(LocalConsumer.class);
verify(consumersIndex).setConsumer(consumerCaptor.capture());
assertEquals(offset, consumerCaptor.getValue().getLocalOffset());
}
/**
* Tests that documents with type "policy" are skipped.
*/
public void testInitialize_SkipPolicyType() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/policy.zip";
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
Path zipPath = createZipFileWithContent("policy.json",
"{\"payload\": {\"type\": \"policy\", \"document\": {\"id\": \"p1\"}}}"
);
when(snapshotClient.downloadFile(url)).thenReturn(zipPath);
// Act
this.snapshotService.initialize(remoteConsumer);
// Assert
verify(contentIndexMock, never()).executeBulk(any(BulkRequest.class));
}
/**
* Tests that type "decoder" documents are enriched with a YAML field.
*/
public void testInitialize_EnrichDecoderWithYaml() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/decoder.zip";
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
String jsonContent = "{\"payload\": {\"type\": \"decoder\", \"document\": {\"name\": \"syslog\", \"parent\": \"root\"}}}";
Path zipPath = createZipFileWithContent("decoder.json", jsonContent);
when(snapshotClient.downloadFile(url)).thenReturn(zipPath);
// Act
this.snapshotService.initialize(remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(contentIndexMock).executeBulk(bulkCaptor.capture());
IndexRequest request = (IndexRequest) bulkCaptor.getValue().requests().getFirst();
String source = request.source().utf8ToString();
assertTrue("Should contain 'decoder' field", source.contains("\"decoder\":"));
}
/**
* Tests preprocessing: 'related.sigma_id' should be renamed to 'related.id'.
*/
public void testInitialize_PreprocessSigmaId() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/sigma.zip";
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
String jsonContent = "{\"payload\": {\"type\": \"rule\", \"document\": {\"id\": \"R1\", \"related\": {\"sigma_id\": \"S-123\", \"type\": \"test-value\"}}}}";
Path zipPath = createZipFileWithContent("sigma.json", jsonContent);
when(snapshotClient.downloadFile(url)).thenReturn(zipPath);
// Act
this.snapshotService.initialize(remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(contentIndexMock).executeBulk(bulkCaptor.capture());
IndexRequest request = (IndexRequest) bulkCaptor.getValue().requests().get(0);
String source = request.source().utf8ToString();
assertFalse("Should not contain sigma_id", source.contains("\"sigma_id\""));
assertTrue("Should contain id with value S-123", source.contains("\"id\":\"S-123\""));
}
/**
* Tests that files without 'payload', 'type', or 'document' are skipped.
*/
public void testInitialize_InvalidJsonStructure() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/invalid.zip";
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
String jsonContent =
"{}\n" + // Missing payload
"{\"payload\": {}}\n" + // Missing type
"{\"payload\": {\"type\": \"valid\", \"no_doc\": {}}}"; // Missing document
Path zipPath = createZipFileWithContent("invalid.json", jsonContent);
when(snapshotClient.downloadFile(url)).thenReturn(zipPath);
// Act
this.snapshotService.initialize(remoteConsumer);
// Assert
verify(contentIndexMock, never()).executeBulk(any(BulkRequest.class));
}
/**
* Tests preprocessing with related array, objects inside array should also be sanitized.
*/
public void testInitialize_PreprocessSigmaIdInArray() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/sigma_array.zip";
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
String jsonContent = "{\"payload\": {\"type\": \"rule\", \"document\": {\"id\": \"R2\", \"related\": [{\"sigma_id\": \"999\"}]}}}";
Path zipPath = createZipFileWithContent("sigma_array.json", jsonContent);
when(snapshotClient.downloadFile(url)).thenReturn(zipPath);
// Act
this.snapshotService.initialize(remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(contentIndexMock).executeBulk(bulkCaptor.capture());
String source = ((IndexRequest) bulkCaptor.getValue().requests().getFirst()).source().utf8ToString();
assertFalse("Should not contain sigma_id", source.contains("\"sigma_id\""));
assertTrue("Should contain id with value 999", source.contains("999"));
}
/**
* Tests that if a file contains a mix of valid JSON and corrupt lines (parsing errors),
* the service logs the error, skips the bad line, and continues indexing the valid ones.
*/
public void testInitialize_SkipInvalidJson() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/corrupt.zip";
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
String jsonContent =
"{\"payload\": {\"type\": \"reputation\", \"document\": {\"id\": \"1\", \"ip\": \"1.1.1.1\"}}}\n" +
"THIS_IS_NOT_JSON_{{}}\n" +
"{\"payload\": {\"type\": \"reputation\", \"document\": {\"id\": \"2\", \"ip\": \"2.2.2.2\"}}}";
Path zipPath = createZipFileWithContent("mixed.json", jsonContent);
when(snapshotClient.downloadFile(url)).thenReturn(zipPath);
// Act
this.snapshotService.initialize(remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(contentIndexMock, atLeastOnce()).executeBulk(bulkCaptor.capture());
// We expect exactly 2 valid actions (Line 1 and Line 3), skipping Line 2
int totalActions = bulkCaptor.getAllValues().stream()
.mapToInt(BulkRequest::numberOfActions)
.sum();
assertEquals("Should index the 2 valid documents and skip the corrupt one", 2, totalActions);
}
/**
* Tests that the generated YAML for decoders strictly respects the order defined in DECODER_ORDER_KEYS.
*/
public void testInitialize_DecoderYamlKeyOrdering() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/decoder_order.zip";
when(remoteConsumer.getSnapshotLink()).thenReturn(url);
String jsonContent = "{\"payload\": {\"type\": \"decoder\", \"document\": " +
"{\"check\": \"some_regex\", \"name\": \"ssh-decoder\", \"parents\": [\"root\"]}}}";
Path zipPath = createZipFileWithContent("decoder_order.json", jsonContent);
when(snapshotClient.downloadFile(url)).thenReturn(zipPath);
// Act
this.snapshotService.initialize(remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(contentIndexMock).executeBulk(bulkCaptor.capture());
IndexRequest request = (IndexRequest) bulkCaptor.getValue().requests().getFirst();
String source = request.source().utf8ToString();
String yamlContent = new ObjectMapper().readTree(source).path("decoder").asText();
assertTrue("YAML content should contain 'name'", yamlContent.contains("name"));
assertTrue("Field 'name' should appear before 'parents'",
yamlContent.indexOf("name") < yamlContent.indexOf("parents"));
assertTrue("Field 'parents' should appear before 'check'",
yamlContent.indexOf("parents") < yamlContent.indexOf("check"));
}
/**
* Helper to create a temporary ZIP file containing a single file with specific content.
*/
private Path createZipFileWithContent(String fileName, String content) throws IOException {
Path zipPath = this.tempDir.resolve("test_" + System.nanoTime() + ".zip");
try (ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(zipPath))) {
ZipEntry entry = new ZipEntry(fileName);
zos.putNextEntry(entry);
zos.write(content.getBytes(StandardCharsets.UTF_8));
zos.closeEntry();
}
return zipPath;
}
}

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.utils;
package com.wazuh.contentmanager.cti.catalog.utils;
import com.google.gson.JsonObject;
import org.opensearch.test.OpenSearchIntegTestCase;

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.utils;
package com.wazuh.contentmanager.cti.catalog.utils;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
@ -29,8 +29,6 @@ import java.nio.file.Path;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import static org.mockito.Mockito.*;
/** Class to handle unzip tests */
public class UnzipTests extends OpenSearchTestCase {
private Path tempDestinationDirectory;

View File

@ -1,24 +0,0 @@
/*
* Copyright (C) 2024, Wazuh Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.updater;
import org.opensearch.test.OpenSearchTestCase;
/** Class for the Content Manager tests */
public class ContentManagerTests extends OpenSearchTestCase {
// Add unit tests for your plugin
}

View File

@ -1,153 +0,0 @@
/*
* Copyright (C) 2024, Wazuh Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.utils;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Path;
import java.util.Iterator;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import static org.mockito.Mockito.*;
/** Class to handle unzip tests */
public class SnapshotManagerTests extends OpenSearchTestCase {
private ConsumersIndex consumersIndex;
private ContentIndex contentIndex;
private CTIClient ctiClient;
private Privileged privilegedSpy;
private ConsumerInfo consumerInfo;
private SnapshotManager snapshotManager;
@Mock private Environment mockEnvironment;
@Mock private ClusterService mockClusterService;
@InjectMocks private PluginSettings pluginSettings;
@Before
public void setUp() throws Exception {
super.setUp();
Path envDir = createTempDir();
Settings settings =
Settings.builder()
.put("path.home", envDir.toString()) // Required by OpenSearch
.putList("path.repo", envDir.toString())
.put("content_manager.max_changes", 1000)
.build();
this.mockEnvironment = spy(new Environment(settings, envDir));
when(this.mockEnvironment.settings()).thenReturn(settings);
this.pluginSettings =
PluginSettings.getInstance(this.mockEnvironment.settings(), this.mockClusterService);
this.ctiClient = mock(CTIClient.class);
this.consumersIndex = mock(ConsumersIndex.class);
this.contentIndex = mock(ContentIndex.class);
this.privilegedSpy = Mockito.spy(Privileged.class);
this.snapshotManager =
Mockito.spy(
new SnapshotManager(
this.ctiClient,
this.mockEnvironment,
this.consumersIndex,
this.contentIndex,
this.privilegedSpy,
this.pluginSettings));
this.consumerInfo = mock(ConsumerInfo.class);
}
/**
* Test that updating the context index works
*
* @throws IOException Rethrown from updateContextIndex()
*/
public void testSuccessfulConsumerIndexing() throws IOException {
// Fixtures
ConsumerInfo consumerInfo =
new ConsumerInfo("test-name", "test-context", 1L, 1L, "http://example.com");
IndexResponse response = mock(IndexResponse.class, "SuccessfulResponse");
// Mocks
doReturn(consumerInfo).when(this.ctiClient).getConsumerInfo();
doReturn(this.consumerInfo).when(this.consumersIndex).get(anyString(), anyString());
doReturn(true).when(this.consumersIndex).index(consumerInfo);
doReturn(DocWriteResponse.Result.CREATED).when(response).getResult();
// Act &6 Assert
this.snapshotManager.initConsumer();
verify(this.consumersIndex).index(any(ConsumerInfo.class));
}
/**
* Ensure IOException is thrown when updateContextIndex() fails
*
* @throws IOException error parsing CTI response.
*/
public void testFailedConsumerIndexing() throws IOException {
// Fixtures
ConsumerInfo consumerInfo =
new ConsumerInfo("test-name", "test-context", 1L, 1L, "http://example.com");
IndexResponse response = mock(IndexResponse.class, "FailedResponse");
// Mocks
doReturn(consumerInfo).when(this.ctiClient).getConsumerInfo();
doReturn(this.consumerInfo).when(this.consumersIndex).get(anyString(), anyString());
doReturn(false).when(this.consumersIndex).index(consumerInfo);
doReturn(DocWriteResponse.Result.NOT_FOUND).when(response).getResult();
// Act && Assert
assertThrows(IOException.class, () -> this.snapshotManager.initConsumer());
}
/**
* Check that the fromSnapshot() method is being executed
*
* @throws IOException rethrown from unzip()
*/
public void testSuccessfulIndexSnapshot() throws IOException {
doReturn(this.consumerInfo).when(this.consumersIndex).get(anyString(), anyString());
Path snapshotZip = mock(Path.class);
doReturn("http://example.com/file.zip").when(this.consumerInfo).getLastSnapshotLink();
doReturn(snapshotZip).when(this.ctiClient).download(anyString(), any(Environment.class));
Path outputDir = mock(Path.class);
doReturn(outputDir).when(this.mockEnvironment).tmpDir();
DirectoryStream<Path> stream = mock(DirectoryStream.class);
Path jsonPath = mock(Path.class);
Iterator<Path> iterator = mock(Iterator.class);
doReturn(iterator).when(stream).iterator();
doReturn(jsonPath).when(iterator).next();
doReturn(stream).when(this.snapshotManager).getStream(any(Path.class));
doNothing().when(this.snapshotManager).unzip(any(Path.class), any(Path.class));
this.snapshotManager.indexSnapshot(this.consumerInfo);
verify(this.contentIndex).fromSnapshot(anyString());
}
}