Space-wide content management (#684)

* Update mappings and services to generate hash and space.name

* Add .cti-policies index

* Update tests

* Add changelog entry

* Apply changes from code review

* Disable generation of .rules_development_0.0.1-rules_development_0.0.1_test-policy index

* Add missing 'this'

* Clean-up code

* Update changelog

* Clean-up code again

---------

Co-authored-by: Alex Ruiz <alejandro.ruiz.becerra@wazuh.com>
This commit is contained in:
Jorge Sánchez 2025-12-09 19:12:42 +01:00 committed by GitHub
parent 26c006f6bf
commit 807ef9d124
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 713 additions and 327 deletions

View File

@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add job scheduler basic logic [(#671)](https://github.com/wazuh/wazuh-indexer-plugins/pull/671)
- Init content from snapshot [(#670)](https://github.com/wazuh/wazuh-indexer-plugins/pull/670)
- Add scheduled content update [(#682)](https://github.com/wazuh/wazuh-indexer-plugins/pull/682)
- Add space-wide content management [(#684)](https://github.com/wazuh/wazuh-indexer-plugins/pull/684)
### Dependencies
- Upgrade to Gradle 8.14.3 [(#649)](https://github.com/wazuh/wazuh-indexer-plugins/pull/649)

View File

@ -1,6 +1,6 @@
package com.wazuh.contentmanager.cti.catalog.client;
import com.wazuh.contentmanager.client.HttpResponseCallback;
import com.wazuh.contentmanager.cti.catalog.utils.HttpResponseCallback;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

View File

@ -16,14 +16,12 @@
*/
package com.wazuh.contentmanager.cti.catalog.index;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.wazuh.contentmanager.cti.catalog.model.Decoder;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
import com.wazuh.contentmanager.cti.catalog.model.Resource;
import com.wazuh.contentmanager.cti.catalog.utils.JsonPatch;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.apache.logging.log4j.LogManager;
@ -54,19 +52,12 @@ import org.opensearch.transport.client.Client;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Manages operations for a specific Wazuh CTI Content Index.
* <p>
* This class handles the lifecycle of the index (creation, deletion) as well as
* CRUD operations for documents, including specialized logic for parsing
* and sanitizing CTI content payloads.
*/
public class ContentIndex {
private static final Logger log = LogManager.getLogger(ContentIndex.class);
@ -78,11 +69,9 @@ public class ContentIndex {
private final String alias;
private final ObjectMapper jsonMapper;
private final ObjectMapper yamlMapper;
private static final List<String> DECODER_ORDER_KEYS = Arrays.asList(
"name", "metadata", "parents", "definitions", "check",
"parse|event.original", "parse|message", "normalize"
);
private static final String JSON_TYPE_KEY = "type";
private static final String JSON_DECODER_KEY = "decoder";
/**
* Constructs a new ContentIndex manager.
@ -111,7 +100,14 @@ public class ContentIndex {
this.mappingsPath = mappingsPath;
this.alias = alias;
this.jsonMapper = new ObjectMapper();
this.yamlMapper = new ObjectMapper(new YAMLFactory());
}
/**
* Returns the name of the index managed by this instance.
* @return The index name.
*/
public String getIndexName() {
return this.indexName;
}
/**
@ -170,10 +166,10 @@ public class ContentIndex {
* @throws IOException If the indexing operation fails.
*/
public void create(String id, JsonObject payload) throws IOException {
this.processPayload(payload);
JsonObject processedPayload = this.processPayload(payload);
IndexRequest request = new IndexRequest(this.indexName)
.id(id)
.source(payload.toString(), XContentType.JSON);
.source(processedPayload.toString(), XContentType.JSON);
try {
this.client.index(request).get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
@ -207,12 +203,12 @@ public class ContentIndex {
}
// 3. Process
this.processPayload(currentDoc);
JsonObject processedDoc = this.processPayload(currentDoc);
// 4. Index
IndexRequest request = new IndexRequest(this.indexName)
.id(id)
.source(currentDoc.toString(), XContentType.JSON);
.source(processedDoc.toString(), XContentType.JSON);
this.client.index(request).get(this.pluginSettings.getClientTimeout(), TimeUnit.SECONDS);
}
@ -277,89 +273,29 @@ public class ContentIndex {
}
/**
* Orchestrates the enrichment and sanitization of a payload.
* Orchestrates the enrichment and sanitization of a payload using Domain Models.
*
* @param payload The JSON payload to process.
* @return A new JsonObject containing the processed payload.
*/
private void processPayload(JsonObject payload) {
if (payload.has("type") && "decoder".equalsIgnoreCase(payload.get("type").getAsString())) {
this.enrichDecoderWithYaml(payload);
}
if (payload.has("document")) {
this.preprocessDocument(payload.getAsJsonObject("document"));
}
}
/**
* Generates a YAML representation for decoder documents.
*
* @param payload The payload containing the decoder definition.
*/
private void enrichDecoderWithYaml(JsonObject payload) {
public JsonObject processPayload(JsonObject payload) {
try {
if (!payload.has("document")) return;
JsonNode docNode = this.jsonMapper.readTree(payload.get("document").toString());
Resource resource;
if (docNode != null && docNode.isObject()) {
Map<String, Object> orderedDecoderMap = new LinkedHashMap<>();
for (String key : DECODER_ORDER_KEYS) {
if (docNode.has(key)) orderedDecoderMap.put(key, docNode.get(key));
}
Iterator<Map.Entry<String, JsonNode>> fields = docNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (!DECODER_ORDER_KEYS.contains(field.getKey())) {
orderedDecoderMap.put(field.getKey(), field.getValue());
}
}
payload.addProperty("decoder", this.yamlMapper.writeValueAsString(orderedDecoderMap));
// 1. Delegate parsing logic to the appropriate Model
if (payload.has(JSON_TYPE_KEY) && JSON_DECODER_KEY.equalsIgnoreCase(payload.get(JSON_TYPE_KEY).getAsString())) {
resource = Decoder.fromPayload(payload);
} else {
resource = Resource.fromPayload(payload);
}
// 2. Convert Model back to JsonObject for OpenSearch indexing
String jsonString = this.jsonMapper.writeValueAsString(resource);
return JsonParser.parseString(jsonString).getAsJsonObject();
} catch (IOException e) {
log.error("Failed to convert decoder payload to YAML: {}", e.getMessage(), e);
}
}
/**
* Sanitizes the document by removing internal or unnecessary fields.
* <p>
* This removes fields like 'date', 'enabled', and internal metadata, and
* normalizes 'related' objects.
*
* @param document The document object to preprocess.
*/
private void preprocessDocument(JsonObject document) {
if (document.has("metadata") && document.get("metadata").isJsonObject()) {
JsonObject metadata = document.getAsJsonObject("metadata");
if (metadata.has("custom_fields")) {
metadata.remove("custom_fields");
}
if (metadata.has("dataset")) {
metadata.remove("dataset");
}
}
if (document.has("related")) {
JsonElement relatedElement = document.get("related");
if (relatedElement.isJsonObject()) {
this.sanitizeRelatedObject(relatedElement.getAsJsonObject());
} else if (relatedElement.isJsonArray()) {
JsonArray relatedArray = relatedElement.getAsJsonArray();
for (JsonElement element : relatedArray) {
if (element.isJsonObject()) this.sanitizeRelatedObject(element.getAsJsonObject());
}
}
}
}
/**
* Normalizes a "related" object.
*
* @param relatedObj The related object to sanitize.
*/
private void sanitizeRelatedObject(JsonObject relatedObj) {
if (relatedObj.has("sigma_id")) {
relatedObj.add("id", relatedObj.get("sigma_id"));
relatedObj.remove("sigma_id");
log.error("Failed to process payload via models: {}", e.getMessage(), e);
return new JsonObject();
}
}
}

View File

@ -19,7 +19,7 @@ public abstract class AbstractConsumer {
* @return The context string.
*/
public String getContext() {
return context;
return this.context;
}
/**
@ -28,6 +28,6 @@ public abstract class AbstractConsumer {
* @return The name string.
*/
public String getName() {
return name;
return this.name;
}
}

View File

@ -0,0 +1,118 @@
package com.wazuh.contentmanager.cti.catalog.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.*;
/**
* Model representing a Decoder resource.
*/
public class Decoder extends Resource {
private static final Logger log = LogManager.getLogger(Decoder.class);
// Tools for YAML generation
private static final ObjectMapper jsonMapper = new ObjectMapper();
private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
private static final List<String> DECODER_ORDER_KEYS = Arrays.asList(
"name", "metadata", "parents", "definitions", "check",
"parse|event.original", "parse|message", "normalize"
);
@JsonProperty("decoder")
private String decoder;
/**
* Default constructor.
*/
public Decoder() {
super();
}
/**
* Factory method to create a {@link Decoder} instance from a raw JSON payload.
*
* @param payload The raw JSON object containing the decoder data.
* @return A populated {@link Decoder} object with the generated YAML string.
*/
public static Decoder fromPayload(JsonObject payload) {
Decoder decoder = new Decoder();
// 1. Basic logic for every resource
Resource.populateResource(decoder, payload);
// 2. Decoder-specific logic (YAML generation)
if (payload.has("document")) {
decoder.setDecoder(toYamlString(payload));
}
return decoder;
}
/**
* Generates a YAML representation for decoder documents.
*
* @param payload The source JSON object.
* @return A string containing the formatted YAML, or {@code null} if the "document" key is missing or an error occurs.
*/
private static String toYamlString(JsonObject payload) {
try {
if (!payload.has("document")) return null;
JsonNode docNode = jsonMapper.readTree(payload.get("document").toString());
if (docNode != null && docNode.isObject()) {
Map<String, Object> orderedDecoderMap = new LinkedHashMap<>();
// Add keys in order
for (String key : DECODER_ORDER_KEYS) {
if (docNode.has(key)) orderedDecoderMap.put(key, docNode.get(key));
}
// Add remaining keys
Iterator<Map.Entry<String, JsonNode>> fields = docNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (!DECODER_ORDER_KEYS.contains(field.getKey())) {
orderedDecoderMap.put(field.getKey(), field.getValue());
}
}
return yamlMapper.writeValueAsString(orderedDecoderMap);
}
} catch (IOException e) {
log.error("Failed to convert decoder payload to YAML: {}", e.getMessage(), e);
}
return null;
}
/**
* Gets the decoder YAML string.
*
* @return The decoder content in YAML format.
*/
public String getDecoder() {
return this.decoder;
}
/**
* Sets the decoder YAML string.
*
* @param decoder The decoder content in YAML format.
*/
public void setDecoder(String decoder) {
this.decoder = decoder;
}
@Override
public String toString() {
return "Decoder{" +
"decoder='" + this.decoder + '\'' +
", " + super.toString() +
'}';
}
}

View File

@ -69,7 +69,7 @@ public class LocalConsumer extends AbstractConsumer implements ToXContent {
* @return The sequence number of the last processed item.
*/
public long getLocalOffset() {
return localOffset;
return this.localOffset;
}
/**
@ -78,7 +78,7 @@ public class LocalConsumer extends AbstractConsumer implements ToXContent {
* @return The sequence number of the latest item available upstream.
*/
public long getRemoteOffset() {
return remoteOffset;
return this.remoteOffset;
}
/**
@ -87,17 +87,17 @@ public class LocalConsumer extends AbstractConsumer implements ToXContent {
* @return A string containing the URL, or empty if not set.
*/
public String getSnapshotLink() {
return snapshotLink;
return this.snapshotLink;
}
@Override
public String toString() {
return "LocalConsumer{" +
"localOffset=" + localOffset +
", remoteOffset=" + remoteOffset +
", snapshotLink='" + snapshotLink + '\'' +
", context='" + context + '\'' +
", name='" + name + '\'' +
"localOffset=" + this.localOffset +
", remoteOffset=" + this.remoteOffset +
", snapshotLink='" + this.snapshotLink + '\'' +
", context='" + this.context + '\'' +
", name='" + this.name + '\'' +
'}';
}

View File

@ -120,35 +120,35 @@ public class Offset implements ToXContentObject {
*
* @return The resource ID string.
*/
public String getResource() { return resource; }
public String getResource() { return this.resource; }
/**
* Gets the type of modification performed.
*
* @return The {@link Type} enum value (CREATE, UPDATE, DELETE).
*/
public Type getType() { return type; }
public Type getType() { return this.type; }
/**
* Gets the list of patch operations associated with this change.
*
* @return A list of {@link Operation} objects, or an empty list if none exist.
*/
public List<Operation> getOperations() { return operations; }
public List<Operation> getOperations() { return this.operations; }
/**
* Gets the sequential offset ID of this change event.
*
* @return The offset value as a long.
*/
public long getOffset() { return offset; }
public long getOffset() { return this.offset; }
/**
* Gets the full content payload of the resource.
*
* @return A Map representing the resource JSON, or null if not present.
*/
public Map<String, Object> getPayload() { return payload; }
public Map<String, Object> getPayload() { return this.payload; }
/**
* Serializes this object into an {@link XContentBuilder}.
@ -161,17 +161,27 @@ public class Offset implements ToXContentObject {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (context != null) builder.field(CONTEXT, context);
builder.field(OFFSET, offset);
if (resource != null) builder.field(RESOURCE, resource);
if (type != null) builder.field(TYPE, type);
builder.field(VERSION, version);
if (operations != null) {
if (this.context != null) {
builder.field(CONTEXT, this.context);
}
builder.field(OFFSET, this.offset);
if (this.resource != null) {
builder.field(RESOURCE, this.resource);
}
if (this.type != null) {
builder.field(TYPE, this.type);
}
builder.field(VERSION, this.version);
if (this.operations != null) {
builder.startArray(OPERATIONS);
for (Operation op : operations) op.toXContent(builder, params);
for (Operation op : this.operations) {
op.toXContent(builder, params);
}
builder.endArray();
}
if (payload != null) builder.field(PAYLOAD, payload);
if (this.payload != null) {
builder.field(PAYLOAD, this.payload);
}
return builder.endObject();
}
}

View File

@ -39,8 +39,6 @@ public class Operation implements ToXContentObject {
private final String from;
private final Object value;
private static final Logger log = LogManager.getLogger(Operation.class);
/**
* Constructs a new JSON Patch Operation.
*

View File

@ -28,7 +28,7 @@ public class RemoteConsumer extends AbstractConsumer {
* @return The offset value.
*/
public long getOffset() {
return offset;
return this.offset;
}
/**
@ -37,7 +37,7 @@ public class RemoteConsumer extends AbstractConsumer {
* @return The snapshot URL string.
*/
public String getSnapshotLink() {
return snapshotLink;
return this.snapshotLink;
}
/**
@ -48,10 +48,10 @@ public class RemoteConsumer extends AbstractConsumer {
@Override
public String toString() {
return "RemoteConsumer{" +
"offset=" + offset +
", snapshotLink='" + snapshotLink + '\'' +
", context='" + context + '\'' +
", name='" + name + '\'' +
"offset=" + this.offset +
", snapshotLink='" + this.snapshotLink + '\'' +
", context='" + this.context + '\'' +
", name='" + this.name + '\'' +
'}';
}
}

View File

@ -0,0 +1,223 @@
package com.wazuh.contentmanager.cti.catalog.model;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
* Base model representing a generic catalog resource within the CTI context.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Resource {
private static final Logger log = LogManager.getLogger(Resource.class);
private static final Gson GSON = new Gson();
// JSON Key Constants
private static final String JSON_DOCUMENT_KEY = "document";
private static final String JSON_METADATA_KEY = "metadata";
private static final String JSON_CUSTOM_FIELDS_KEY = "custom_fields";
private static final String JSON_DATASET_KEY = "dataset";
private static final String JSON_RELATED_KEY = "related";
private static final String JSON_SIGMA_ID_KEY = "sigma_id";
@JsonProperty("document")
private Map<String, Object> document;
@JsonProperty("hash")
private Map<String, String> hash;
@JsonProperty("space")
private Map<String, String> space;
/**
* Default constructor.
*/
public Resource() {
}
/**
* Factory method to create a {@link Resource} instance from a raw Gson {@link JsonObject}.
*
* @param payload The raw JSON object containing the resource data.
* @return A fully populated {@link Resource} instance.
*/
public static Resource fromPayload(JsonObject payload) {
Resource resource = new Resource();
Resource.populateResource(resource, payload);
return resource;
}
/**
* Populates the common fields of a Resource instance.
*
* @param resource The resource instance to populate.
* @param payload The source JSON payload.
*/
protected static void populateResource(Resource resource, JsonObject payload) {
// 1. Process Document
if (payload.has(JSON_DOCUMENT_KEY) && payload.get(JSON_DOCUMENT_KEY).isJsonObject()) {
JsonObject rawDoc = payload.getAsJsonObject(JSON_DOCUMENT_KEY).deepCopy();
Resource.preprocessDocument(rawDoc);
resource.setDocument(GSON.fromJson(rawDoc, Map.class));
// 2. Calculate Hash
String hashStr = Resource.calculateSha256(rawDoc);
if (hashStr != null) {
Map<String, String> hashMap = new HashMap<>();
hashMap.put("sha256", hashStr);
resource.setHash(hashMap);
}
}
// 3. Set Space
// TODO: Change To the real logic once CTI is ready
Map<String, String> spaceMap = new HashMap<>();
String spaceName = Space.FREE.toString().toLowerCase(Locale.ROOT);
spaceMap.put("name", spaceName);
resource.setSpace(spaceMap);
}
/**
* Sanitizes the document by removing internal or unnecessary fields.
*
* @param document The JSON object representing the document content.
*/
protected static void preprocessDocument(JsonObject document) {
if (document.has(JSON_METADATA_KEY) && document.get(JSON_METADATA_KEY).isJsonObject()) {
JsonObject metadata = document.getAsJsonObject(JSON_METADATA_KEY);
if (metadata.has(JSON_CUSTOM_FIELDS_KEY)) {
metadata.remove(JSON_CUSTOM_FIELDS_KEY);
}
if (metadata.has(JSON_DATASET_KEY)) {
metadata.remove(JSON_DATASET_KEY);
}
}
if (document.has(JSON_RELATED_KEY)) {
JsonElement relatedElement = document.get(JSON_RELATED_KEY);
if (relatedElement.isJsonObject()) {
Resource.sanitizeRelatedObject(relatedElement.getAsJsonObject());
} else if (relatedElement.isJsonArray()) {
JsonArray relatedArray = relatedElement.getAsJsonArray();
for (JsonElement element : relatedArray) {
if (element.isJsonObject()) {
Resource.sanitizeRelatedObject(element.getAsJsonObject());
}
}
}
}
}
/**
* Helper method to sanitize a single "related" object.
*
* @param relatedObj The JSON object inside the "related" field.
*/
private static void sanitizeRelatedObject(JsonObject relatedObj) {
if (relatedObj.has(JSON_SIGMA_ID_KEY)) {
relatedObj.add("id", relatedObj.get(JSON_SIGMA_ID_KEY));
relatedObj.remove(JSON_SIGMA_ID_KEY);
}
}
/**
* Calculates the SHA-256 checksum of a JSON Object.
*
* @param json The JSON object to hash.
* @return The Hexadecimal string representation of the SHA-256 hash, or {@code null} if calculation fails.
*/
protected static String calculateSha256(JsonObject json) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] encodedhash = digest.digest(json.toString().getBytes(StandardCharsets.UTF_8));
StringBuilder hexString = new StringBuilder(2 * encodedhash.length);
for (byte b : encodedhash) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
} catch (Exception e) {
log.error("Failed to calculate SHA-256 hash", e);
return null;
}
}
/**
* Gets the document content.
*
* @return A Map representing the document.
*/
public Map<String, Object> getDocument() {
return this.document;
}
/**
* Sets the document content.
*
* @param document A Map representing the document.
*/
public void setDocument(Map<String, Object> document) {
this.document = document;
}
/**
* Gets the hash map containing checksums.
*
* @return A Map containing hash algorithms and values.
*/
public Map<String, String> getHash() {
return this.hash;
}
/**
* Sets the hash map.
*
* @param hash A Map containing hash algorithms and values.
*/
public void setHash(Map<String, String> hash) {
this.hash = hash;
}
/**
* Gets the space definition.
*
* @return A Map containing space details.
*/
public Map<String, String> getSpace() {
return this.space;
}
/**
* Sets the space definition.
*
* @param space A Map containing space details.
*/
public void setSpace(Map<String, String> space) {
this.space = space;
}
@Override
public String toString() {
return "Resource{" +
"document=" + this.document +
", hash=" + this.hash +
", space=" + this.space +
'}';
}
}

View File

@ -0,0 +1,8 @@
package com.wazuh.contentmanager.cti.catalog.model;
/**
* Enum class that describes all the possible spaces that the content manager will manage
*/
public enum Space {
FREE, PAID, CUSTOM
}

View File

@ -16,11 +16,6 @@
*/
package com.wazuh.contentmanager.cti.catalog.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.wazuh.contentmanager.cti.catalog.client.SnapshotClient;
@ -50,15 +45,6 @@ import java.util.*;
public class SnapshotServiceImpl implements SnapshotService {
private static final Logger log = LogManager.getLogger(SnapshotServiceImpl.class);
// Mappers and Keys for YAML enrichment
private final ObjectMapper jsonMapper;
private final ObjectMapper yamlMapper;
private static final List<String> DECODER_ORDER_KEYS = Arrays.asList(
"name", "metadata", "parents", "definitions", "check",
"parse|event.original", "parse|message", "normalize"
);
// Keys to navigate the JSON structure
private static final String JSON_PAYLOAD_KEY = "payload";
private static final String JSON_TYPE_KEY = "type";
@ -84,8 +70,6 @@ public class SnapshotServiceImpl implements SnapshotService {
this.consumersIndex = consumersIndex;
this.environment = environment;
this.pluginSettings = PluginSettings.getInstance();
this.jsonMapper = new ObjectMapper();
this.yamlMapper = new ObjectMapper(new YAMLFactory());
this.snapshotClient = new SnapshotClient(this.environment);
}
@ -189,34 +173,33 @@ public class SnapshotServiceImpl implements SnapshotService {
}
String type = payload.get(JSON_TYPE_KEY).getAsString();
// Skip policy type documents
if ("policy".equalsIgnoreCase(type)) {
log.debug("Skipping document with type {}.", type);
// TODO: Delete once the consumer is changed
if (this.context.equals("rules_development_0.0.1") && this.consumer.equals("rules_development_0.0.1_test") && "policy".equals(type)) {
continue;
}
if ("decoder".equalsIgnoreCase(type)) {
this.enrichDecoderWithYaml(payload);
// 3. Delegate Processing to ContentIndex
// We use the first index instance to process the payload because logic is stateless/shared.
JsonObject processedPayload;
if (!this.contentIndex.isEmpty()) {
processedPayload = this.contentIndex.getFirst().processPayload(payload);
} else {
log.error("No ContentIndex available to process payload.");
return;
}
String indexName = this.getIndexName(type);
// 3. Extract the inner 'document' object for ID retrieval and Preprocessing
if (!payload.has(JSON_DOCUMENT_KEY)) {
log.warn("Payload missing '{}'. Skipping.", JSON_DOCUMENT_KEY);
continue;
}
JsonObject innerDocument = payload.getAsJsonObject(JSON_DOCUMENT_KEY);
// Preprocess the inner document
this.preprocessDocument(innerDocument);
// 4. Create Index Request
IndexRequest indexRequest = new IndexRequest(indexName)
.source(payload.toString(), XContentType.JSON);
.source(processedPayload.toString(), XContentType.JSON);
if (innerDocument.has(JSON_ID_KEY)) {
indexRequest.id(innerDocument.get(JSON_ID_KEY).getAsString());
// Determine ID
if (processedPayload.has(JSON_DOCUMENT_KEY)) {
JsonObject innerDocument = processedPayload.getAsJsonObject(JSON_DOCUMENT_KEY);
if (innerDocument.has(JSON_ID_KEY)) {
indexRequest.id(innerDocument.get(JSON_ID_KEY).getAsString());
}
}
bulkRequest.add(indexRequest);
@ -244,88 +227,10 @@ public class SnapshotServiceImpl implements SnapshotService {
}
}
/**
* Parses the decoder payload, reorders keys based on a predefined list,
* and adds a YAML representation of the decoder to the payload.
*
* @param payload The JSON object containing the decoder data.
*/
private void enrichDecoderWithYaml(JsonObject payload) {
try {
JsonNode docNode = this.jsonMapper.readTree(payload.toString()).get(JSON_DOCUMENT_KEY);
if (docNode != null && docNode.isObject()) {
Map<String, Object> orderedDecoderMap = new LinkedHashMap<>();
// Add JSON nodes in the expected order, if they exist.
for (String key : DECODER_ORDER_KEYS) {
if (docNode.has(key)) {
orderedDecoderMap.put(key, docNode.get(key));
}
}
// Add remaining JSON nodes.
Iterator<Map.Entry<String, JsonNode>> fields = docNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (!DECODER_ORDER_KEYS.contains(field.getKey())) {
orderedDecoderMap.put(field.getKey(), field.getValue());
}
}
// Add YAML representation to the document
String yamlContent = this.yamlMapper.writeValueAsString(orderedDecoderMap);
payload.addProperty("decoder", yamlContent);
}
} catch (IOException e) {
log.error("Failed to convert decoder payload to YAML: {}", e.getMessage(), e);
}
}
private String getIndexName(String type) {
return String.format(Locale.ROOT, ".%s-%s-%s", this.context, this.consumer, type);
}
/**
* Preprocesses the document to handle field transformations.
* Specifically, renames 'related.sigma_id' to 'related.id' to avoid StrictDynamicMappingException.
*
* @param document The document object to process.
*/
private void preprocessDocument(JsonObject document) {
if (!document.has("related")) {
return;
}
JsonElement relatedElement = document.get("related");
if (relatedElement.isJsonObject()) {
this.sanitizeRelatedObject(relatedElement.getAsJsonObject());
} else if (relatedElement.isJsonArray()) {
JsonArray relatedArray = relatedElement.getAsJsonArray();
for (JsonElement element : relatedArray) {
if (element.isJsonObject()) {
this.sanitizeRelatedObject(element.getAsJsonObject());
}
}
}
}
/**
* Helper method to perform the actual rename/delete logic on a specific related object.
*
* @param relatedObj The specific related object (either standalone or from an array).
*/
private void sanitizeRelatedObject(JsonObject relatedObj) {
if (relatedObj.has("sigma_id")) {
JsonElement sigmaIdValue = relatedObj.get("sigma_id");
// Move value to 'id'
relatedObj.add("id", sigmaIdValue);
// Remove the original 'sigma_id' field
relatedObj.remove("sigma_id");
}
}
/**
* Deletes temporary files and directories used during the process.
*/

View File

@ -77,7 +77,7 @@ public class UpdateServiceImpl extends AbstractService implements UpdateService
* Implementation details:
* 1. Fetches the changes JSON from the API for the given range.
* 2. Parses the response into {@link Changes} and {@link Offset} objects.
* 3. Iterates through offsets, skipping specific internal resources ("policy").
* 3. Iterates through offsets.
* 4. Delegates specific operations to {@link #applyOffset(Offset)}.
* 5. Updates the {@link LocalConsumer} record in the index with the last successfully applied offset.
*
@ -102,11 +102,6 @@ public class UpdateServiceImpl extends AbstractService implements UpdateService
long lastAppliedOffset = fromOffset;
for (Offset offset : changes.get()) {
if ("policy".equals(offset.getResource())) {
lastAppliedOffset = offset.getOffset();
continue;
}
this.applyOffset(offset);
lastAppliedOffset = offset.getOffset();
}
@ -147,6 +142,12 @@ public class UpdateServiceImpl extends AbstractService implements UpdateService
JsonObject payload = this.gson.toJsonTree(offset.getPayload()).getAsJsonObject();
if (payload.has("type")) {
String type = payload.get("type").getAsString();
// TODO: Delete once the consumer is changed
if (this.context.equals("rules_development_0.0.1") && this.consumer.equals("rules_development_0.0.1_test") && "policy".equals(type)) {
break;
}
index = this.indices.get(type);
if (index != null) {
index.create(id, payload);

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.client;
package com.wazuh.contentmanager.cti.catalog.utils;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
@ -50,18 +50,17 @@ public class HttpResponseCallback implements FutureCallback<SimpleHttpResponse>
@Override
public void completed(SimpleHttpResponse response) {
log.debug("{}->{}", httpRequest, new StatusLine(response));
log.debug("{}->{}", this.httpRequest, new StatusLine(response));
log.debug("Got response: {} {}", response.getCode(), response.getBodyText());
}
@Override
public void failed(Exception ex) {
log.error("{}->{}", httpRequest, ex);
// throw new HttpException(errorMessage, ex);
log.error("{}->{}", this.httpRequest, ex);
}
@Override
public void cancelled() {
log.debug("{} cancelled", httpRequest);
log.debug("{} cancelled", this.httpRequest);
}
}

View File

@ -1,7 +1,7 @@
package com.wazuh.contentmanager.cti.console.client;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.client.HttpResponseCallback;
import com.wazuh.contentmanager.cti.catalog.utils.HttpResponseCallback;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;

View File

@ -136,10 +136,15 @@ public class CatalogSyncJob implements JobExecutor {
mappings.put(
"integration", "/mappings/cti-rules-integrations-mappings.json"
);
// TODO: Delete once the consumer is changed
// mappings.put(
// "policy", "/mappings/cti-policies-mappings.json"
// );
Map<String, String> aliases = new HashMap<>();
aliases.put("rule", ".cti-rules");
aliases.put("integration", ".cti-integration-rules");
// aliases.put("policy", ".cti-policies");
this.syncConsumerServices(context, consumer, mappings, aliases);
log.info("Rules Consumer correctly synchronized.");
@ -161,11 +166,15 @@ public class CatalogSyncJob implements JobExecutor {
mappings.put(
"integration", "/mappings/cti-decoders-integrations-mappings.json"
);
mappings.put(
"policy", "/mappings/cti-policies-mappings.json"
);
Map<String, String> aliases = new HashMap<>();
aliases.put("decoder", ".cti-decoders");
aliases.put("kvdb", ".cti-kvdbs");
aliases.put("integration", ".cti-integration-decoders");
aliases.put("policy", ".cti-policies");
this.syncConsumerServices(context, consumer, mappings, aliases);
log.info("Decoders Consumer correctly synchronized.");

View File

@ -1,14 +1,14 @@
{
"dynamic": "true",
"properties": {
"type": {
"type": "keyword"
},
"document": {
"properties": {
"id": {
"type": "keyword"
},
"parent_decoder": {
"type": "keyword"
},
"title": {
"type": "keyword"
},
@ -40,6 +40,22 @@
"type": "keyword"
}
}
},
"hash": {
"type": "object",
"properties": {
"sha256": {
"type": "keyword"
}
}
},
"space": {
"type": "object",
"properties": {
"name": {
"type": "keyword"
}
}
}
}
}

View File

@ -11,15 +11,6 @@
}
],
"properties": {
"type": {
"type": "keyword"
},
"integration_id": {
"type": "keyword"
},
"decoder": {
"type": "keyword"
},
"document": {
"properties": {
"name": {
@ -94,6 +85,22 @@
"type": "keyword"
}
}
},
"hash": {
"type": "object",
"properties": {
"sha256": {
"type": "keyword"
}
}
},
"space": {
"type": "object",
"properties": {
"name": {
"type": "keyword"
}
}
}
}
}

View File

@ -1,12 +1,6 @@
{
"dynamic": "true",
"properties": {
"type": {
"type": "keyword"
},
"integration_id": {
"type": "keyword"
},
"document": {
"properties": {
"title": {
@ -32,6 +26,22 @@
"enabled": false
}
}
},
"hash": {
"type": "object",
"properties": {
"sha256": {
"type": "keyword"
}
}
},
"space": {
"type": "object",
"properties": {
"name": {
"type": "keyword"
}
}
}
}
}

View File

@ -0,0 +1,52 @@
{
"dynamic": "true",
"properties": {
"document": {
"properties": {
"root_decoder": {
"type": "keyword"
},
"date": {
"type": "date"
},
"modified": {
"type": "date"
},
"author": {
"type": "keyword"
},
"description": {
"type": "text"
},
"references": {
"type": "keyword"
},
"documentation": {
"type": "keyword"
},
"integrations": {
"type": "keyword"
},
"title": {
"type": "keyword"
}
}
},
"hash": {
"type": "object",
"properties": {
"sha256": {
"type": "keyword"
}
}
},
"space": {
"type": "object",
"properties": {
"name": {
"type": "keyword"
}
}
}
}
}

View File

@ -1,9 +1,6 @@
{
"dynamic": "true",
"properties": {
"type": {
"type": "keyword"
},
"document": {
"type": "object",
"properties": {
@ -35,6 +32,22 @@
"type": "keyword"
}
}
},
"hash": {
"type": "object",
"properties": {
"sha256": {
"type": "keyword"
}
}
},
"space": {
"type": "object",
"properties": {
"name": {
"type": "keyword"
}
}
}
}
}

View File

@ -13,12 +13,6 @@
}
],
"properties": {
"integration_id": {
"type": "keyword"
},
"type": {
"type": "keyword"
},
"document": {
"type": "object",
"properties": {
@ -114,6 +108,22 @@
"type": "keyword"
}
}
},
"hash": {
"type": "object",
"properties": {
"sha256": {
"type": "keyword"
}
}
},
"space": {
"type": "object",
"properties": {
"name": {
"type": "keyword"
}
}
}
}
}

View File

@ -147,6 +147,90 @@ public class ContentIndexTests extends OpenSearchTestCase {
assertTrue(yaml.contains("check: \"starts_with($event.original, \\\"8:syscheck:\\\")\""));
}
/**
* Test creating a Rule with Sigma ID.
* Validates that sigma_id is renamed to id in related object.
*/
public void testCreate_Rule_SigmaIdProcessing() {
// Mock
PlainActionFuture<IndexResponse> future = PlainActionFuture.newFuture();
future.onResponse(this.indexResponse);
when(this.client.index(any(IndexRequest.class))).thenReturn(future);
String jsonPayload = "{" +
"\"type\": \"rule\"," +
"\"document\": {" +
" \"id\": \"R1\"," +
" \"related\": {" +
" \"sigma_id\": \"S-123\"," +
" \"type\": \"test-value\"" +
" }" +
"}" +
"}";
JsonObject payload = JsonParser.parseString(jsonPayload).getAsJsonObject();
String id = "R1";
// Act
try {
this.contentIndex.create(id, payload);
} catch (Exception e) {
fail("Create should not throw exception: " + e.getMessage());
}
// Assert
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
verify(this.client).index(captor.capture());
JsonObject source = JsonParser.parseString(captor.getValue().source().utf8ToString()).getAsJsonObject();
JsonObject related = source.getAsJsonObject("document").getAsJsonObject("related");
assertFalse("Should not contain sigma_id", related.has("sigma_id"));
assertTrue("Should contain id", related.has("id"));
assertEquals("S-123", related.get("id").getAsString());
}
/**
* Test creating a Rule with Sigma ID in related array.
* Validates that sigma_id is renamed to id in related array objects.
*/
public void testCreate_Rule_SigmaIdArrayProcessing() {
// Mock
PlainActionFuture<IndexResponse> future = PlainActionFuture.newFuture();
future.onResponse(this.indexResponse);
when(this.client.index(any(IndexRequest.class))).thenReturn(future);
String jsonPayload = "{" +
"\"type\": \"rule\"," +
"\"document\": {" +
" \"id\": \"R2\"," +
" \"related\": [{" +
" \"sigma_id\": \"999\"" +
" }]" +
"}" +
"}";
JsonObject payload = JsonParser.parseString(jsonPayload).getAsJsonObject();
String id = "R2";
// Act
try {
this.contentIndex.create(id, payload);
} catch (Exception e) {
fail("Create should not throw exception: " + e.getMessage());
}
// Assert
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
verify(this.client).index(captor.capture());
JsonObject source = JsonParser.parseString(captor.getValue().source().utf8ToString()).getAsJsonObject();
JsonObject relatedItem = source.getAsJsonObject("document")
.getAsJsonArray("related").get(0).getAsJsonObject();
assertFalse("Should not contain sigma_id", relatedItem.has("sigma_id"));
assertTrue("Should contain id", relatedItem.has("id"));
assertEquals("999", relatedItem.get("id").getAsString());
}
/**
* Test updating a document.
* Simulates fetching an existing document, applying operations, and re-indexing.

View File

@ -16,7 +16,7 @@
*/
package com.wazuh.contentmanager.cti.catalog.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonObject;
import com.wazuh.contentmanager.cti.catalog.client.SnapshotClient;
import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
@ -83,6 +83,9 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
String consumer = "test-consumer";
this.snapshotService = new SnapshotServiceImpl(context, consumer, contentIndices, this.consumersIndex, this.environment);
this.snapshotService.setSnapshotClient(this.snapshotClient);
when(this.contentIndexMock.processPayload(any(JsonObject.class)))
.thenAnswer(invocation -> invocation.getArgument(0));
}
@After
@ -143,6 +146,7 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
// Assert
verify(this.contentIndexMock).clear();
verify(this.contentIndexMock).processPayload(any(JsonObject.class));
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(this.contentIndexMock, atLeastOnce()).executeBulk(bulkCaptor.capture());
@ -159,9 +163,9 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
}
/**
* Tests that documents with type "policy" are skipped.
* Tests that documents with type "policy" are indexed correctly.
*/
public void testInitialize_SkipPolicyType() throws IOException, URISyntaxException {
public void testInitialize_IndexesPolicyType() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/policy.zip";
when(this.remoteConsumer.getSnapshotLink()).thenReturn(url);
@ -175,11 +179,18 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
this.snapshotService.initialize(this.remoteConsumer);
// Assert
verify(this.contentIndexMock, never()).executeBulk(any(BulkRequest.class));
verify(this.contentIndexMock).processPayload(any(JsonObject.class));
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(this.contentIndexMock).executeBulk(bulkCaptor.capture());
IndexRequest request = (IndexRequest) bulkCaptor.getValue().requests().getFirst();
assertEquals(".test-context-test-consumer-policy", request.index());
assertEquals("p1", request.id());
}
/**
* Tests that type "decoder" documents are enriched with a YAML field.
* Tests that type "decoder" documents are delegated to ContentIndex for processing.
*/
public void testInitialize_EnrichDecoderWithYaml() throws IOException, URISyntaxException {
// Mock
@ -194,17 +205,13 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
this.snapshotService.initialize(this.remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(this.contentIndexMock).executeBulk(bulkCaptor.capture());
IndexRequest request = (IndexRequest) bulkCaptor.getValue().requests().getFirst();
String source = request.source().utf8ToString();
assertTrue("Should contain 'decoder' field", source.contains("\"decoder\":"));
// Verify delegation to ContentIndex.processPayload
verify(this.contentIndexMock).processPayload(any(JsonObject.class));
verify(this.contentIndexMock).executeBulk(any(BulkRequest.class));
}
/**
* Tests preprocessing: 'related.sigma_id' should be renamed to 'related.id'.
* Tests preprocessing: Verifies that payload processing is delegated.
*/
public void testInitialize_PreprocessSigmaId() throws IOException, URISyntaxException {
// Mock
@ -219,14 +226,8 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
this.snapshotService.initialize(this.remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(this.contentIndexMock).executeBulk(bulkCaptor.capture());
IndexRequest request = (IndexRequest) bulkCaptor.getValue().requests().getFirst();
String source = request.source().utf8ToString();
assertFalse("Should not contain sigma_id", source.contains("\"sigma_id\""));
assertTrue("Should contain id with value S-123", source.contains("\"id\":\"S-123\""));
verify(this.contentIndexMock).processPayload(any(JsonObject.class));
verify(this.contentIndexMock).executeBulk(any(BulkRequest.class));
}
/**
@ -238,9 +239,8 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
when(this.remoteConsumer.getSnapshotLink()).thenReturn(url);
String jsonContent =
"{}\n" + // Missing payload
"{\"payload\": {}}\n" + // Missing type
"{\"payload\": {\"type\": \"valid\", \"no_doc\": {}}}"; // Missing document
"{}\n" +
"{\"payload\": {}}";
Path zipPath = createZipFileWithContent("invalid.json", jsonContent);
when(this.snapshotClient.downloadFile(url)).thenReturn(zipPath);
@ -253,7 +253,7 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
}
/**
* Tests preprocessing with related array, objects inside array should also be sanitized.
* Tests preprocessing with related array: Verifies that payload processing is delegated.
*/
public void testInitialize_PreprocessSigmaIdInArray() throws IOException, URISyntaxException {
// Mock
@ -268,13 +268,8 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
this.snapshotService.initialize(this.remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(this.contentIndexMock).executeBulk(bulkCaptor.capture());
String source = ((IndexRequest) bulkCaptor.getValue().requests().getFirst()).source().utf8ToString();
assertFalse("Should not contain sigma_id", source.contains("\"sigma_id\""));
assertTrue("Should contain id with value 999", source.contains("999"));
verify(this.contentIndexMock).processPayload(any(JsonObject.class));
verify(this.contentIndexMock).executeBulk(any(BulkRequest.class));
}
/**
@ -298,6 +293,7 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
this.snapshotService.initialize(this.remoteConsumer);
// Assert
verify(this.contentIndexMock, atLeastOnce()).processPayload(any(JsonObject.class));
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(this.contentIndexMock, atLeastOnce()).executeBulk(bulkCaptor.capture());
@ -310,9 +306,9 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
}
/**
* Tests that the generated YAML for decoders strictly respects the order defined in DECODER_ORDER_KEYS.
* Tests delegation for decoder YAML processing.
*/
public void testInitialize_DecoderYamlKeyOrdering() throws IOException, URISyntaxException {
public void testInitialize_DecoderYamlDelegation() throws IOException, URISyntaxException {
// Mock
String url = "http://example.com/decoder_order.zip";
when(this.remoteConsumer.getSnapshotLink()).thenReturn(url);
@ -327,18 +323,8 @@ public class SnapshotServiceImplTests extends OpenSearchTestCase {
this.snapshotService.initialize(this.remoteConsumer);
// Assert
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(this.contentIndexMock).executeBulk(bulkCaptor.capture());
IndexRequest request = (IndexRequest) bulkCaptor.getValue().requests().getFirst();
String source = request.source().utf8ToString();
String yamlContent = new ObjectMapper().readTree(source).path("decoder").asText();
assertTrue("YAML content should contain 'name'", yamlContent.contains("name"));
assertTrue("Field 'name' should appear before 'parents'",
yamlContent.indexOf("name") < yamlContent.indexOf("parents"));
assertTrue("Field 'parents' should appear before 'check'",
yamlContent.indexOf("parents") < yamlContent.indexOf("check"));
verify(this.contentIndexMock).processPayload(any(JsonObject.class));
verify(this.contentIndexMock).executeBulk(any(BulkRequest.class));
}
/**