diff --git a/plugins/content-manager/src/main/java/com/wazuh/contentmanager/cti/catalog/model/Space.java b/plugins/content-manager/src/main/java/com/wazuh/contentmanager/cti/catalog/model/Space.java index 29bca1c2..96c0b934 100644 --- a/plugins/content-manager/src/main/java/com/wazuh/contentmanager/cti/catalog/model/Space.java +++ b/plugins/content-manager/src/main/java/com/wazuh/contentmanager/cti/catalog/model/Space.java @@ -4,5 +4,5 @@ package com.wazuh.contentmanager.cti.catalog.model; * Enum class that describes all the possible spaces that the content manager will manage */ public enum Space { - FREE, PAID, CUSTOM + FREE, PAID, CUSTOM, DRAFT, TESTING } diff --git a/plugins/content-manager/src/main/java/com/wazuh/contentmanager/jobscheduler/jobs/CatalogSyncJob.java b/plugins/content-manager/src/main/java/com/wazuh/contentmanager/jobscheduler/jobs/CatalogSyncJob.java index 404c488f..143e89c8 100644 --- a/plugins/content-manager/src/main/java/com/wazuh/contentmanager/jobscheduler/jobs/CatalogSyncJob.java +++ b/plugins/content-manager/src/main/java/com/wazuh/contentmanager/jobscheduler/jobs/CatalogSyncJob.java @@ -5,6 +5,7 @@ import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex; import com.wazuh.contentmanager.cti.catalog.index.ContentIndex; import com.wazuh.contentmanager.cti.catalog.model.LocalConsumer; import com.wazuh.contentmanager.cti.catalog.model.RemoteConsumer; +import com.wazuh.contentmanager.cti.catalog.model.Space; import com.wazuh.contentmanager.cti.catalog.service.ConsumerService; import com.wazuh.contentmanager.cti.catalog.service.ConsumerServiceImpl; import com.wazuh.contentmanager.cti.catalog.service.SnapshotServiceImpl; @@ -13,11 +14,21 @@ import com.wazuh.contentmanager.jobscheduler.JobExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.env.Environment; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.search.SearchHit; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.Client; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -176,7 +187,29 @@ public class CatalogSyncJob implements JobExecutor { aliases.put("integration", ".cti-integration-decoders"); aliases.put("policy", ".cti-policies"); - this.syncConsumerServices(context, consumer, mappings, aliases); + boolean updated = this.syncConsumerServices(context, consumer, mappings, aliases); + + // Only calculate hashes if there was an update + if (updated) { + log.info("Changes detected in Decoders Consumer. Refreshing indices and calculating hashes..."); + + try { + this.client.admin().indices().prepareRefresh( + this.getIndexName(context, consumer, "decoder"), + this.getIndexName(context, consumer, "kvdb"), + this.getIndexName(context, consumer, "integration"), + this.getIndexName(context, consumer, "policy") + ).get(); + } catch (Exception e) { + log.warn("Error refreshing indices before hash calculation: {}", e.getMessage()); + } + + // Calculate and update hash of hashes + this.calculateAndStorePolicyHashes(context, consumer); + } else { + log.info("No changes in Decoders Consumer. Skipping hash calculation."); + } + log.info("Decoders Consumer correctly synchronized."); } @@ -212,8 +245,9 @@ public class CatalogSyncJob implements JobExecutor { * @param consumer The specific consumer identifier. * @param mappings A map associating content types to their JSON mapping file paths. * @param aliases A map associating content types to their OpenSearch alias names. + * @return true if an update or initialization occurred, false otherwise. */ - private void syncConsumerServices(String context, String consumer, Map mappings, Map aliases) { + private boolean syncConsumerServices(String context, String consumer, Map mappings, Map aliases) { ConsumerService consumerService = new ConsumerServiceImpl(context, consumer, this.consumersIndex); LocalConsumer localConsumer = consumerService.getLocalConsumer(); RemoteConsumer remoteConsumer = consumerService.getRemoteConsumer(); @@ -253,6 +287,7 @@ public class CatalogSyncJob implements JobExecutor { this.environment ); snapshotService.initialize(remoteConsumer); + return true; } else if (remoteConsumer != null && localConsumer.getLocalOffset() != remoteConsumer.getOffset()) { log.info("Starting offset-based update for consumer [{}]", consumer); UpdateServiceImpl updateService = new UpdateServiceImpl( @@ -264,6 +299,177 @@ public class CatalogSyncJob implements JobExecutor { ); updateService.update(localConsumer.getLocalOffset(), remoteConsumer.getOffset()); updateService.close(); + return true; + } + + return false; + } + + /** + * Calculates the aggregate hash (hash of hashes) and update the policies. + */ + private void calculateAndStorePolicyHashes(String context, String consumer) { + try { + String policyIndex = this.getIndexName(context, consumer, "policy"); + String integrationIndex = this.getIndexName(context, consumer, "integration"); + String decoderIndex = this.getIndexName(context, consumer, "decoder"); + String kvdbIndex = this.getIndexName(context, consumer, "kvdb"); + String ruleIndex = this.getIndexName(context, consumer, "rule"); + + // Verify policy index exists + if (!this.client.admin().indices().prepareExists(policyIndex).get().isExists()) { + log.warn("Policy index [{}] does not exist. Skipping hash calculation.", policyIndex); + return; + } + + // Fetch all policies + SearchRequest searchRequest = new SearchRequest(policyIndex); + searchRequest.source().query(QueryBuilders.matchAllQuery()).size(5); // One policy for each space + SearchResponse response = this.client.search(searchRequest).actionGet(); + + BulkRequest bulkUpdateRequest = new BulkRequest(); + + for (SearchHit hit : response.getHits().getHits()) { + Map source = hit.getSourceAsMap(); + + Map currentSpace = (Map) source.get("space"); + if (currentSpace != null) { + String spaceName = (String) currentSpace.get("name"); + if (Space.DRAFT.toString().toLowerCase(Locale.ROOT).equals(spaceName) || + Space.TESTING.toString().toLowerCase(Locale.ROOT).equals(spaceName)) { + log.info("Skipping hash calculation for policy [{}] because it is in space [{}]", hit.getId(), spaceName); + continue; + } + } + + List accumulatedHashes = new ArrayList<>(); + + // 1. Policy Hash + accumulatedHashes.add(this.extractHash(source)); + + Map document = (Map) source.get("document"); + if (document != null && document.containsKey("integrations")) { + List integrationIds = (List) document.get("integrations"); + + for (String integrationId : integrationIds) { + Map integrationSource = this.getDocumentSource(integrationIndex, integrationId); + if (integrationSource == null) continue; + + // 2. Integration Hash + accumulatedHashes.add(this.extractHash(integrationSource)); + + Map intDoc = (Map) integrationSource.get("document"); + if (intDoc != null) { + // 3. Decoders Hash + if (intDoc.containsKey("decoders")) { + List decoderIds = (List) intDoc.get("decoders"); + for (String id : decoderIds) { + Map s = this.getDocumentSource(decoderIndex, id); + if (s != null) accumulatedHashes.add(this.extractHash(s)); + } + } + + // 4. KVDBs Hash + if (intDoc.containsKey("kvdbs")) { + List kvdbIds = (List) intDoc.get("kvdbs"); + for (String id : kvdbIds) { + Map s = this.getDocumentSource(kvdbIndex, id); + if (s != null) accumulatedHashes.add(this.extractHash(s)); + } + } + + // 5. Rules Hash + if (intDoc.containsKey("rules")) { + List ruleIds = (List) intDoc.get("rules"); + for (String id : ruleIds) { + Map s = this.getDocumentSource(ruleIndex, id); + if (s != null) accumulatedHashes.add(this.extractHash(s)); + } + } + } + } + } + + // Calculate Final Hash + String finalHash = this.calculateSha256(accumulatedHashes); + + // Prepare Update + Map updateMap = new HashMap<>(); + Map spaceMap = (Map) source.getOrDefault("space", new HashMap<>()); + Map hashMap = (Map) spaceMap.getOrDefault("hash", new HashMap<>()); + + hashMap.put("sha256", finalHash); + spaceMap.put("hash", hashMap); + updateMap.put("space", spaceMap); + + bulkUpdateRequest.add(new UpdateRequest(policyIndex, hit.getId()) + .doc(updateMap, XContentType.JSON)); + } + + if (bulkUpdateRequest.numberOfActions() > 0) { + this.client.bulk(bulkUpdateRequest).actionGet(); + log.info("Updated policy hashes for consumer [{}]", consumer); + } + + } catch (Exception e) { + log.error("Error calculating policy hashes: {}", e.getMessage(), e); + } + } + + /** + * Helper to get document source by ID from an index. + * Returns null if index or document does not exist. + */ + private Map getDocumentSource(String index, String id) { + try { + GetResponse response = this.client.prepareGet(index, id).get(); + if (response.isExists()) { + return response.getSourceAsMap(); + } + else{ + log.info("Document [{}] not found in index [{}]", id, index); + } + } catch (Exception e) { + log.info("Error retrieving document [{}] from index [{}]: {}", id, index, e.getMessage()); + } + return null; + } + + /** + * Helper to extract sha256 hash from document source. + */ + private String extractHash(Map source) { + if (source.containsKey("hash")) { + Map hashObj = (Map) source.get("hash"); + return (String) hashObj.getOrDefault("sha256", ""); + } + return ""; + } + + /** + * Computes SHA-256 hash of a list of strings concatenated. + */ + private String calculateSha256(List inputs) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + StringBuilder combined = new StringBuilder(); + for (String s : inputs) { + if (s != null) combined.append(s); + } + byte[] encodedhash = digest.digest(combined.toString().getBytes(StandardCharsets.UTF_8)); + + StringBuilder hexString = new StringBuilder(2 * encodedhash.length); + for (byte b : encodedhash) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } catch (Exception e) { + log.error("Error calculating SHA-256", e); + return ""; } } } diff --git a/plugins/content-manager/src/main/resources/mappings/cti-policies-mappings.json b/plugins/content-manager/src/main/resources/mappings/cti-policies-mappings.json index 8000d435..f6aefbba 100644 --- a/plugins/content-manager/src/main/resources/mappings/cti-policies-mappings.json +++ b/plugins/content-manager/src/main/resources/mappings/cti-policies-mappings.json @@ -45,6 +45,14 @@ "properties": { "name": { "type": "keyword" + }, + "hash": { + "type": "object", + "properties": { + "sha256": { + "type": "keyword" + } + } } } }