Add hash of hashes calculation

This commit is contained in:
Jorge Sanchez 2025-12-09 11:57:43 +01:00 committed by Alex Ruiz
parent 807ef9d124
commit ac69e50eba
No known key found for this signature in database
3 changed files with 217 additions and 3 deletions

View File

@ -4,5 +4,5 @@ package com.wazuh.contentmanager.cti.catalog.model;
* Enum class that describes all the possible spaces that the content manager will manage
*/
public enum Space {
FREE, PAID, CUSTOM
FREE, PAID, CUSTOM, DRAFT, TESTING
}

View File

@ -5,6 +5,7 @@ import com.wazuh.contentmanager.cti.catalog.index.ConsumersIndex;
import com.wazuh.contentmanager.cti.catalog.index.ContentIndex;
import com.wazuh.contentmanager.cti.catalog.model.LocalConsumer;
import com.wazuh.contentmanager.cti.catalog.model.RemoteConsumer;
import com.wazuh.contentmanager.cti.catalog.model.Space;
import com.wazuh.contentmanager.cti.catalog.service.ConsumerService;
import com.wazuh.contentmanager.cti.catalog.service.ConsumerServiceImpl;
import com.wazuh.contentmanager.cti.catalog.service.SnapshotServiceImpl;
@ -13,11 +14,21 @@ import com.wazuh.contentmanager.jobscheduler.JobExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.env.Environment;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.search.SearchHit;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -176,7 +187,29 @@ public class CatalogSyncJob implements JobExecutor {
aliases.put("integration", ".cti-integration-decoders");
aliases.put("policy", ".cti-policies");
this.syncConsumerServices(context, consumer, mappings, aliases);
boolean updated = this.syncConsumerServices(context, consumer, mappings, aliases);
// Only calculate hashes if there was an update
if (updated) {
log.info("Changes detected in Decoders Consumer. Refreshing indices and calculating hashes...");
try {
this.client.admin().indices().prepareRefresh(
this.getIndexName(context, consumer, "decoder"),
this.getIndexName(context, consumer, "kvdb"),
this.getIndexName(context, consumer, "integration"),
this.getIndexName(context, consumer, "policy")
).get();
} catch (Exception e) {
log.warn("Error refreshing indices before hash calculation: {}", e.getMessage());
}
// Calculate and update hash of hashes
this.calculateAndStorePolicyHashes(context, consumer);
} else {
log.info("No changes in Decoders Consumer. Skipping hash calculation.");
}
log.info("Decoders Consumer correctly synchronized.");
}
@ -212,8 +245,9 @@ public class CatalogSyncJob implements JobExecutor {
* @param consumer The specific consumer identifier.
* @param mappings A map associating content types to their JSON mapping file paths.
* @param aliases A map associating content types to their OpenSearch alias names.
* @return true if an update or initialization occurred, false otherwise.
*/
private void syncConsumerServices(String context, String consumer, Map<String, String> mappings, Map<String, String> aliases) {
private boolean syncConsumerServices(String context, String consumer, Map<String, String> mappings, Map<String, String> aliases) {
ConsumerService consumerService = new ConsumerServiceImpl(context, consumer, this.consumersIndex);
LocalConsumer localConsumer = consumerService.getLocalConsumer();
RemoteConsumer remoteConsumer = consumerService.getRemoteConsumer();
@ -253,6 +287,7 @@ public class CatalogSyncJob implements JobExecutor {
this.environment
);
snapshotService.initialize(remoteConsumer);
return true;
} else if (remoteConsumer != null && localConsumer.getLocalOffset() != remoteConsumer.getOffset()) {
log.info("Starting offset-based update for consumer [{}]", consumer);
UpdateServiceImpl updateService = new UpdateServiceImpl(
@ -264,6 +299,177 @@ public class CatalogSyncJob implements JobExecutor {
);
updateService.update(localConsumer.getLocalOffset(), remoteConsumer.getOffset());
updateService.close();
return true;
}
return false;
}
/**
* Calculates the aggregate hash (hash of hashes) and update the policies.
*/
private void calculateAndStorePolicyHashes(String context, String consumer) {
try {
String policyIndex = this.getIndexName(context, consumer, "policy");
String integrationIndex = this.getIndexName(context, consumer, "integration");
String decoderIndex = this.getIndexName(context, consumer, "decoder");
String kvdbIndex = this.getIndexName(context, consumer, "kvdb");
String ruleIndex = this.getIndexName(context, consumer, "rule");
// Verify policy index exists
if (!this.client.admin().indices().prepareExists(policyIndex).get().isExists()) {
log.warn("Policy index [{}] does not exist. Skipping hash calculation.", policyIndex);
return;
}
// Fetch all policies
SearchRequest searchRequest = new SearchRequest(policyIndex);
searchRequest.source().query(QueryBuilders.matchAllQuery()).size(5); // One policy for each space
SearchResponse response = this.client.search(searchRequest).actionGet();
BulkRequest bulkUpdateRequest = new BulkRequest();
for (SearchHit hit : response.getHits().getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
Map<String, Object> currentSpace = (Map<String, Object>) source.get("space");
if (currentSpace != null) {
String spaceName = (String) currentSpace.get("name");
if (Space.DRAFT.toString().toLowerCase(Locale.ROOT).equals(spaceName) ||
Space.TESTING.toString().toLowerCase(Locale.ROOT).equals(spaceName)) {
log.info("Skipping hash calculation for policy [{}] because it is in space [{}]", hit.getId(), spaceName);
continue;
}
}
List<String> accumulatedHashes = new ArrayList<>();
// 1. Policy Hash
accumulatedHashes.add(this.extractHash(source));
Map<String, Object> document = (Map<String, Object>) source.get("document");
if (document != null && document.containsKey("integrations")) {
List<String> integrationIds = (List<String>) document.get("integrations");
for (String integrationId : integrationIds) {
Map<String, Object> integrationSource = this.getDocumentSource(integrationIndex, integrationId);
if (integrationSource == null) continue;
// 2. Integration Hash
accumulatedHashes.add(this.extractHash(integrationSource));
Map<String, Object> intDoc = (Map<String, Object>) integrationSource.get("document");
if (intDoc != null) {
// 3. Decoders Hash
if (intDoc.containsKey("decoders")) {
List<String> decoderIds = (List<String>) intDoc.get("decoders");
for (String id : decoderIds) {
Map<String, Object> s = this.getDocumentSource(decoderIndex, id);
if (s != null) accumulatedHashes.add(this.extractHash(s));
}
}
// 4. KVDBs Hash
if (intDoc.containsKey("kvdbs")) {
List<String> kvdbIds = (List<String>) intDoc.get("kvdbs");
for (String id : kvdbIds) {
Map<String, Object> s = this.getDocumentSource(kvdbIndex, id);
if (s != null) accumulatedHashes.add(this.extractHash(s));
}
}
// 5. Rules Hash
if (intDoc.containsKey("rules")) {
List<String> ruleIds = (List<String>) intDoc.get("rules");
for (String id : ruleIds) {
Map<String, Object> s = this.getDocumentSource(ruleIndex, id);
if (s != null) accumulatedHashes.add(this.extractHash(s));
}
}
}
}
}
// Calculate Final Hash
String finalHash = this.calculateSha256(accumulatedHashes);
// Prepare Update
Map<String, Object> updateMap = new HashMap<>();
Map<String, Object> spaceMap = (Map<String, Object>) source.getOrDefault("space", new HashMap<>());
Map<String, Object> hashMap = (Map<String, Object>) spaceMap.getOrDefault("hash", new HashMap<>());
hashMap.put("sha256", finalHash);
spaceMap.put("hash", hashMap);
updateMap.put("space", spaceMap);
bulkUpdateRequest.add(new UpdateRequest(policyIndex, hit.getId())
.doc(updateMap, XContentType.JSON));
}
if (bulkUpdateRequest.numberOfActions() > 0) {
this.client.bulk(bulkUpdateRequest).actionGet();
log.info("Updated policy hashes for consumer [{}]", consumer);
}
} catch (Exception e) {
log.error("Error calculating policy hashes: {}", e.getMessage(), e);
}
}
/**
* Helper to get document source by ID from an index.
* Returns null if index or document does not exist.
*/
private Map<String, Object> getDocumentSource(String index, String id) {
try {
GetResponse response = this.client.prepareGet(index, id).get();
if (response.isExists()) {
return response.getSourceAsMap();
}
else{
log.info("Document [{}] not found in index [{}]", id, index);
}
} catch (Exception e) {
log.info("Error retrieving document [{}] from index [{}]: {}", id, index, e.getMessage());
}
return null;
}
/**
* Helper to extract sha256 hash from document source.
*/
private String extractHash(Map<String, Object> source) {
if (source.containsKey("hash")) {
Map<String, Object> hashObj = (Map<String, Object>) source.get("hash");
return (String) hashObj.getOrDefault("sha256", "");
}
return "";
}
/**
* Computes SHA-256 hash of a list of strings concatenated.
*/
private String calculateSha256(List<String> inputs) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
StringBuilder combined = new StringBuilder();
for (String s : inputs) {
if (s != null) combined.append(s);
}
byte[] encodedhash = digest.digest(combined.toString().getBytes(StandardCharsets.UTF_8));
StringBuilder hexString = new StringBuilder(2 * encodedhash.length);
for (byte b : encodedhash) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
} catch (Exception e) {
log.error("Error calculating SHA-256", e);
return "";
}
}
}

View File

@ -45,6 +45,14 @@
"properties": {
"name": {
"type": "keyword"
},
"hash": {
"type": "object",
"properties": {
"sha256": {
"type": "keyword"
}
}
}
}
}