mirror of
https://github.com/wazuh/wazuh-indexer-plugins.git
synced 2025-12-11 02:29:20 -06:00
Implement retry mechanism (#541)
* Implement retry mechanism * Add changelog entry * Fix bug I was passing the index name as parameter to the method to create the index template * Add configurable backoff * Add default constructor To silence the JavaDocs warning * Register settings * Add back handling of ResourceAlreadyExistsException exceptions
This commit is contained in:
parent
65922e6ace
commit
46fd782aaf
@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
- Add repository bumper [(#500)](https://github.com/wazuh/wazuh-indexer-plugins/pull/500)
|
||||
- Add documentation for the setup plugin [(#498)](https://github.com/wazuh/wazuh-indexer-plugins/pull/498)
|
||||
- Add documentation for default users and roles (RBAC) [(#535)](https://github.com/wazuh/wazuh-indexer-plugins/pull/535)
|
||||
- Implement retry mechanism to the initialization plugin [(#541)](https://github.com/wazuh/wazuh-indexer-plugins/pull/541)
|
||||
|
||||
### Dependencies
|
||||
-
|
||||
|
||||
@ -1,5 +1,32 @@
|
||||
# Configuration Files
|
||||
|
||||
## Initialization plugin settings
|
||||
|
||||
#### Timeout for the OpenSearch client
|
||||
- **Key**: `plugins.setup.timeout`
|
||||
- **Type**: Integer
|
||||
- **Default**: `30`
|
||||
- **Minimum**: `5`
|
||||
- **Maximum**: `120`
|
||||
- **Description**: Timeout in seconds for index and search operations.
|
||||
|
||||
#### Backoff (delay) for the retry mechanism
|
||||
- **Key**: `plugins.setup.backoff`
|
||||
- **Type**: Integer
|
||||
- **Default**: `15`
|
||||
- **Minimum**: `5`
|
||||
- **Maximum**: `60`
|
||||
- **Description**: Delay in seconds for the retry mechanism involving initialization tasks.
|
||||
|
||||
### Example
|
||||
|
||||
Below, there is an example of custom values for these settings within the `opensearch.yml` file:
|
||||
|
||||
```yaml
|
||||
plugins.setup.timeout: 60
|
||||
plugins.setup.backoff: 30
|
||||
```
|
||||
|
||||
## Security - Access Control
|
||||
|
||||
Wazuh Indexer uses the [OpenSearch Security plugin](https://docs.opensearch.org/docs/latest/security/) to manage access control and security features.
|
||||
|
||||
@ -1 +0,0 @@
|
||||
# Configuration files
|
||||
@ -8,6 +8,12 @@ The `SetupPlugin` class holds the list of indices to create. The logic for the c
|
||||
|
||||
By design, the plugin will overwrite any existing index template under the same name.
|
||||
|
||||
### Retry mechanism
|
||||
|
||||
The plugin features a retry mechanism to handle transient faults. In case of a temporal failure (timeouts or similar) during the initialization of the indices, the task is retried after a given amount of time (backoff). If two consecutive faults occur during the initialization of the same index, the initialization process is halted, and the node is shut down. Proper logging is in place to notify administrators before the shutdown occurs.
|
||||
|
||||
The backoff time is configurable. Head to [Configuration Files](/ref/configuration/configuration-files.md#initialization-plugin-settings) for more information.
|
||||
|
||||
## Class diagram
|
||||
|
||||
```mermaid
|
||||
|
||||
@ -83,12 +83,10 @@ configurations {
|
||||
dependencies {
|
||||
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.23.1"
|
||||
implementation "org.slf4j:slf4j-api:1.7.36"
|
||||
//implementation "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}"
|
||||
|
||||
// Job Scheduler stuff
|
||||
// Job Scheduler & ISM stuff (required for the initialization of ISM policies)
|
||||
zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: opensearch_build
|
||||
zipArchive group: 'org.opensearch.plugin', name: 'opensearch-index-management', version: opensearch_build
|
||||
// implementation "org.opensearch:opensearch:${opensearch_version}"
|
||||
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
|
||||
|
||||
}
|
||||
|
||||
@ -19,7 +19,7 @@ package com.wazuh.setup;
|
||||
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.opensearch.cluster.node.DiscoveryNode;
|
||||
import org.opensearch.cluster.service.ClusterService;
|
||||
import org.opensearch.common.unit.TimeValue;
|
||||
import org.opensearch.common.settings.Setting;
|
||||
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
|
||||
import org.opensearch.core.xcontent.NamedXContentRegistry;
|
||||
import org.opensearch.env.Environment;
|
||||
@ -36,13 +36,13 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.wazuh.setup.index.Index;
|
||||
import com.wazuh.setup.index.IndexStateManagement;
|
||||
import com.wazuh.setup.index.StateIndex;
|
||||
import com.wazuh.setup.index.StreamIndex;
|
||||
import com.wazuh.setup.settings.PluginSettings;
|
||||
import com.wazuh.setup.utils.IndexUtils;
|
||||
|
||||
/**
|
||||
@ -51,7 +51,6 @@ import com.wazuh.setup.utils.IndexUtils;
|
||||
*/
|
||||
public class SetupPlugin extends Plugin implements ClusterPlugin {
|
||||
|
||||
public static final TimeValue TIMEOUT = new TimeValue(30L, TimeUnit.SECONDS);
|
||||
private final List<Index> indices = new ArrayList<>();
|
||||
|
||||
/** Default constructor */
|
||||
@ -116,4 +115,9 @@ public class SetupPlugin extends Plugin implements ClusterPlugin {
|
||||
this.indices.forEach(Index::initialize);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return List.of(PluginSettings.TIMEOUT, PluginSettings.BACKOFF);
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,7 +30,7 @@ import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.wazuh.setup.SetupPlugin;
|
||||
import com.wazuh.setup.settings.PluginSettings;
|
||||
import com.wazuh.setup.utils.IndexUtils;
|
||||
|
||||
/**
|
||||
@ -51,6 +51,9 @@ public abstract class Index implements IndexInitializer {
|
||||
String index;
|
||||
String template;
|
||||
|
||||
boolean retry_index_creation;
|
||||
boolean retry_template_creation;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
@ -60,6 +63,9 @@ public abstract class Index implements IndexInitializer {
|
||||
Index(String index, String template) {
|
||||
this.index = index;
|
||||
this.template = template;
|
||||
|
||||
this.retry_index_creation = true;
|
||||
this.retry_template_creation = true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -109,7 +115,11 @@ public abstract class Index implements IndexInitializer {
|
||||
if (!this.indexExists(index)) {
|
||||
CreateIndexRequest request = new CreateIndexRequest(index);
|
||||
CreateIndexResponse createIndexResponse =
|
||||
this.client.admin().indices().create(request).actionGet(SetupPlugin.TIMEOUT);
|
||||
this.client
|
||||
.admin()
|
||||
.indices()
|
||||
.create(request)
|
||||
.actionGet(PluginSettings.getTimeout(this.clusterService.getSettings()));
|
||||
log.info(
|
||||
"Index created successfully: {} {}",
|
||||
createIndexResponse.index(),
|
||||
@ -117,6 +127,18 @@ public abstract class Index implements IndexInitializer {
|
||||
}
|
||||
} catch (ResourceAlreadyExistsException e) {
|
||||
log.info("Index {} already exists. Skipping.", index);
|
||||
} catch (
|
||||
Exception
|
||||
e) { // TimeoutException may be raised by actionGet(), but we cannot catch that one.
|
||||
// Exit condition. Re-attempt to create the index also failed. Original exception is rethrown.
|
||||
if (!this.retry_index_creation) {
|
||||
log.error("Initialization of index [{}] finally failed. The node will shut down.", index);
|
||||
throw e;
|
||||
}
|
||||
log.warn("Operation to create the index [{}] timed out. Retrying...", index);
|
||||
this.retry_index_creation = false;
|
||||
this.indexUtils.sleep(PluginSettings.getBackoff(this.clusterService.getSettings()));
|
||||
this.createIndex(index);
|
||||
}
|
||||
}
|
||||
|
||||
@ -141,7 +163,7 @@ public abstract class Index implements IndexInitializer {
|
||||
.admin()
|
||||
.indices()
|
||||
.putTemplate(putIndexTemplateRequest)
|
||||
.actionGet(SetupPlugin.TIMEOUT);
|
||||
.actionGet(PluginSettings.getTimeout(this.clusterService.getSettings()));
|
||||
|
||||
log.info(
|
||||
"Index template created successfully: {} {}",
|
||||
@ -152,6 +174,21 @@ public abstract class Index implements IndexInitializer {
|
||||
log.error("Error reading index template from filesystem {}", template);
|
||||
} catch (ResourceAlreadyExistsException e) {
|
||||
log.info("Index template {} already exists. Skipping.", template);
|
||||
} catch (
|
||||
Exception
|
||||
e) { // TimeoutException may be raised by actionGet(), but we cannot catch that one.
|
||||
// Exit condition. Re-attempt to create the index template also failed. Original exception is
|
||||
// rethrown.
|
||||
if (!this.retry_template_creation) {
|
||||
log.error(
|
||||
"Initialization of index template [{}] finally failed. The node will shut down.",
|
||||
template);
|
||||
throw e;
|
||||
}
|
||||
log.warn("Operation to create the index template [{}] timed out. Retrying...", template);
|
||||
this.retry_template_creation = false;
|
||||
this.indexUtils.sleep(PluginSettings.getBackoff(this.clusterService.getSettings()));
|
||||
this.createTemplate(template);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -29,7 +29,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.wazuh.setup.SetupPlugin;
|
||||
import com.wazuh.setup.settings.PluginSettings;
|
||||
|
||||
/**
|
||||
* Initializes the Index State Management internal index <code>.opendistro-ism-config</code>.
|
||||
@ -80,12 +80,26 @@ public class IndexStateManagement extends Index {
|
||||
.id(ALERTS_ROLLOVER_POLICY)
|
||||
.source(policyFile, MediaTypeRegistry.JSON);
|
||||
|
||||
client.index(indexRequest).actionGet(SetupPlugin.TIMEOUT);
|
||||
this.client
|
||||
.index(indexRequest)
|
||||
.actionGet(PluginSettings.getTimeout(this.clusterService.getSettings()));
|
||||
log.info("ISM policy [{}] created", policy);
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to load the ISM policy from file: {}", e.getMessage());
|
||||
} catch (ResourceAlreadyExistsException e) {
|
||||
log.error("Policy already exists, skipping creation: {}", e.getMessage());
|
||||
} catch (
|
||||
Exception
|
||||
e) { // TimeoutException may be raised by actionGet(), but we cannot catch that one.
|
||||
// Exit condition. Re-attempt to create the index also failed. Original exception is rethrown.
|
||||
if (!this.retry_index_creation) {
|
||||
log.error("Initialization of policy [{}] finally failed. The node will shut down.", policy);
|
||||
throw e;
|
||||
}
|
||||
log.warn("Operation to create the policy [{}] timed out. Retrying...", policy);
|
||||
this.retry_index_creation = false;
|
||||
this.indexUtils.sleep(PluginSettings.getBackoff(this.clusterService.getSettings()));
|
||||
this.indexPolicy(policy);
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,7 +122,11 @@ public class IndexStateManagement extends Index {
|
||||
.mapping(this.indexUtils.get(templateFile, "mappings"))
|
||||
.settings(this.indexUtils.get(templateFile, "settings"));
|
||||
CreateIndexResponse createIndexResponse =
|
||||
this.client.admin().indices().create(request).actionGet(SetupPlugin.TIMEOUT);
|
||||
this.client
|
||||
.admin()
|
||||
.indices()
|
||||
.create(request)
|
||||
.actionGet(PluginSettings.getTimeout(this.clusterService.getSettings()));
|
||||
log.info(
|
||||
"Index created successfully: {} {}",
|
||||
createIndexResponse.index(),
|
||||
@ -118,6 +136,18 @@ public class IndexStateManagement extends Index {
|
||||
log.error("Error reading index template from filesystem {}", this.template);
|
||||
} catch (ResourceAlreadyExistsException e) {
|
||||
log.info("Index {} already exists. Skipping.", index);
|
||||
} catch (
|
||||
Exception
|
||||
e) { // TimeoutException may be raised by actionGet(), but we cannot catch that one.
|
||||
// Exit condition. Re-attempt to create the index also failed. Original exception is rethrown.
|
||||
if (!this.retry_index_creation) {
|
||||
log.error("Initialization of index [{}] finally failed. The node will shut down.", index);
|
||||
throw e;
|
||||
}
|
||||
log.warn("Operation to create the index [{}] timed out. Retrying...", index);
|
||||
this.retry_index_creation = false;
|
||||
this.indexUtils.sleep(PluginSettings.getBackoff(this.clusterService.getSettings()));
|
||||
this.createIndex(index);
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,6 +155,7 @@ public class IndexStateManagement extends Index {
|
||||
@Override
|
||||
public void initialize() {
|
||||
this.createIndex(this.index);
|
||||
this.retry_index_creation = true; // Re-used variable to retry initialization of ISM policies.
|
||||
this.createPolicies();
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,7 +23,7 @@ import org.opensearch.action.admin.indices.alias.Alias;
|
||||
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
|
||||
|
||||
import com.wazuh.setup.SetupPlugin;
|
||||
import com.wazuh.setup.settings.PluginSettings;
|
||||
|
||||
/**
|
||||
* Class to represent a Stream index. Stream indices contain time-based events of any kind (alerts,
|
||||
@ -60,7 +60,11 @@ public class StreamIndex extends WazuhIndex {
|
||||
CreateIndexRequest request =
|
||||
new CreateIndexRequest(index).alias(new Alias(this.alias).writeIndex(true));
|
||||
CreateIndexResponse createIndexResponse =
|
||||
this.client.admin().indices().create(request).actionGet(SetupPlugin.TIMEOUT);
|
||||
this.client
|
||||
.admin()
|
||||
.indices()
|
||||
.create(request)
|
||||
.actionGet(PluginSettings.getTimeout(this.clusterService.getSettings()));
|
||||
log.info(
|
||||
"Index created successfully: {} {}",
|
||||
createIndexResponse.index(),
|
||||
@ -68,6 +72,18 @@ public class StreamIndex extends WazuhIndex {
|
||||
}
|
||||
} catch (ResourceAlreadyExistsException e) {
|
||||
log.info("Index {} already exists. Skipping.", index);
|
||||
} catch (
|
||||
Exception
|
||||
e) { // TimeoutException may be raised by actionGet(), but we cannot catch that one.
|
||||
// Exit condition. Re-attempt to create the index also failed. Original exception is rethrown.
|
||||
if (!this.retry_index_creation) {
|
||||
log.error("Initialization of index [{}] finally failed. The node will shut down.", index);
|
||||
throw e;
|
||||
}
|
||||
log.warn("Operation to create the index [{}] timed out. Retrying...", index);
|
||||
this.retry_index_creation = false;
|
||||
this.indexUtils.sleep(PluginSettings.getBackoff(this.clusterService.getSettings()));
|
||||
this.createIndex(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Copyright (C) 2024, Wazuh Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package com.wazuh.setup.settings;
|
||||
|
||||
import org.opensearch.common.settings.Setting;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** Settings class for this plugin. */
|
||||
public class PluginSettings {
|
||||
|
||||
/** Default constructor. */
|
||||
public PluginSettings() {}
|
||||
|
||||
/** Default timeout in seconds for operations involving the OpenSearch client. */
|
||||
public static final int DEFAULT_TIMEOUT = 30;
|
||||
|
||||
/**
|
||||
* Default backoff (delay) time in seconds for the retry mechanism involving initialization tasks.
|
||||
*/
|
||||
public static final int DEFAULT_BACKOFF = 15;
|
||||
|
||||
/** Timeout setting definition. */
|
||||
public static final Setting<Integer> TIMEOUT =
|
||||
Setting.intSetting(
|
||||
"plugins.setup.timeout", DEFAULT_TIMEOUT, 5, 120, Setting.Property.NodeScope);
|
||||
|
||||
/** Backoff setting definition. */
|
||||
public static final Setting<Integer> BACKOFF =
|
||||
Setting.intSetting(
|
||||
"plugins.setup.backoff", DEFAULT_BACKOFF, 5, 60, Setting.Property.NodeScope);
|
||||
|
||||
/**
|
||||
* {@link PluginSettings#TIMEOUT} getter.
|
||||
*
|
||||
* @param settings settings of this node.
|
||||
* @return returns the value for the {@link PluginSettings#TIMEOUT} in millis.
|
||||
*/
|
||||
public static long getTimeout(Settings settings) {
|
||||
return new TimeValue(TIMEOUT.get(settings), TimeUnit.SECONDS).millis();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link PluginSettings#BACKOFF} getter.
|
||||
*
|
||||
* @param settings settings of this node.
|
||||
* @return returns the value for the {@link PluginSettings#BACKOFF} in millis.
|
||||
*/
|
||||
public static long getBackoff(Settings settings) {
|
||||
return new TimeValue(BACKOFF.get(settings), TimeUnit.SECONDS).millis();
|
||||
}
|
||||
}
|
||||
@ -76,4 +76,17 @@ public class IndexUtils {
|
||||
public Map<String, Object> get(Map<String, Object> map, String key) {
|
||||
return (Map<String, Object>) map.get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to wrap up the call to {@link Thread#sleep(long)} on a try-catch block.
|
||||
*
|
||||
* @param millis sleep interval in milliseconds.
|
||||
*/
|
||||
public void sleep(long millis) {
|
||||
try {
|
||||
Thread.sleep(millis);
|
||||
} catch (InterruptedException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ import org.opensearch.ResourceAlreadyExistsException;
|
||||
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.opensearch.action.index.IndexRequest;
|
||||
import org.opensearch.cluster.service.ClusterService;
|
||||
import org.opensearch.common.action.ActionFuture;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
@ -31,7 +32,6 @@ import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.wazuh.setup.SetupPlugin;
|
||||
import com.wazuh.setup.utils.IndexUtils;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
@ -48,17 +48,23 @@ public class IndexStateManagementTests extends OpenSearchTestCase {
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
client = mock(Client.class);
|
||||
this.client = mock(Client.class);
|
||||
AdminClient adminClient = mock(AdminClient.class);
|
||||
indicesAdminClient = mock(IndicesAdminClient.class);
|
||||
indexUtils = mock(IndexUtils.class);
|
||||
this.indicesAdminClient = mock(IndicesAdminClient.class);
|
||||
this.indexUtils = mock(IndexUtils.class);
|
||||
|
||||
doReturn(adminClient).when(client).admin();
|
||||
doReturn(indicesAdminClient).when(adminClient).indices();
|
||||
// Default settings
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
Settings settings = Settings.builder().build();
|
||||
doReturn(settings).when(clusterService).getSettings();
|
||||
|
||||
ismIndex = spy(new IndexStateManagement(".opendistro-ism-config", "ism-template"));
|
||||
ismIndex.setClient(client);
|
||||
ismIndex.setIndexUtils(indexUtils);
|
||||
doReturn(adminClient).when(this.client).admin();
|
||||
doReturn(this.indicesAdminClient).when(adminClient).indices();
|
||||
|
||||
this.ismIndex = spy(new IndexStateManagement(".opendistro-ism-config", "ism-template"));
|
||||
this.ismIndex.setClient(this.client);
|
||||
this.ismIndex.setIndexUtils(this.indexUtils);
|
||||
this.ismIndex.setClusterService(clusterService);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -72,28 +78,28 @@ public class IndexStateManagementTests extends OpenSearchTestCase {
|
||||
template.put("settings", Settings.builder().build());
|
||||
template.put("mappings", Map.of());
|
||||
|
||||
doReturn(false).when(ismIndex).indexExists(".opendistro-ism-config");
|
||||
doReturn(template).when(indexUtils).fromFile("ism-template.json");
|
||||
doReturn(template.get("mappings")).when(indexUtils).get(template, "mappings");
|
||||
doReturn(false).when(this.ismIndex).indexExists(".opendistro-ism-config");
|
||||
doReturn(template).when(this.indexUtils).fromFile("ism-template.json");
|
||||
doReturn(template.get("mappings")).when(this.indexUtils).get(template, "mappings");
|
||||
|
||||
CreateIndexResponse createResponse = mock(CreateIndexResponse.class);
|
||||
doReturn(".opendistro-ism-config").when(createResponse).index();
|
||||
|
||||
ActionFuture actionFuture = mock(ActionFuture.class);
|
||||
|
||||
doReturn(actionFuture).when(indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
doReturn(actionFuture).when(this.indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
|
||||
Map<String, Object> policyFile = Map.of("policy", "definition");
|
||||
doReturn(policyFile).when(indexUtils).fromFile("wazuh-alerts-rollover-policy.json");
|
||||
doReturn(policyFile).when(this.indexUtils).fromFile("wazuh-alerts-rollover-policy.json");
|
||||
|
||||
doReturn(actionFuture).when(client).index(any(IndexRequest.class));
|
||||
doReturn(actionFuture).when(this.client).index(any(IndexRequest.class));
|
||||
|
||||
doReturn(createResponse).when(actionFuture).actionGet(SetupPlugin.TIMEOUT);
|
||||
doReturn(createResponse).when(actionFuture).actionGet(anyLong());
|
||||
|
||||
ismIndex.initialize();
|
||||
this.ismIndex.initialize();
|
||||
|
||||
verify(indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
verify(client).index(any(IndexRequest.class));
|
||||
verify(this.indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
verify(this.client).index(any(IndexRequest.class));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -101,13 +107,13 @@ public class IndexStateManagementTests extends OpenSearchTestCase {
|
||||
* index creation.
|
||||
*/
|
||||
public void testIndexAlreadyExists_SkipsCreation() {
|
||||
doReturn(true).when(ismIndex).indexExists(".opendistro-ism-config");
|
||||
doReturn(true).when(this.ismIndex).indexExists(".opendistro-ism-config");
|
||||
|
||||
doReturn(mock(ActionFuture.class)).when(client).index(any(IndexRequest.class));
|
||||
doReturn(mock(ActionFuture.class)).when(this.client).index(any(IndexRequest.class));
|
||||
|
||||
ismIndex.initialize();
|
||||
this.ismIndex.initialize();
|
||||
|
||||
verify(indicesAdminClient, never()).create(any());
|
||||
verify(this.indicesAdminClient, never()).create(any());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -117,12 +123,12 @@ public class IndexStateManagementTests extends OpenSearchTestCase {
|
||||
* @throws IOException if there is an error reading the policy file
|
||||
*/
|
||||
public void testPolicyFileMissing_LogsError() throws IOException {
|
||||
doReturn(true).when(ismIndex).indexExists(".opendistro-ism-config");
|
||||
doReturn(true).when(this.ismIndex).indexExists(".opendistro-ism-config");
|
||||
doThrow(new IOException("file not found"))
|
||||
.when(indexUtils)
|
||||
.fromFile("wazuh-alerts-rollover-policy.json");
|
||||
|
||||
ismIndex.initialize();
|
||||
this.ismIndex.initialize();
|
||||
|
||||
// Verifies that exception is caught and logged
|
||||
}
|
||||
@ -135,15 +141,15 @@ public class IndexStateManagementTests extends OpenSearchTestCase {
|
||||
* @throws IOException if there is an error reading the policy file
|
||||
*/
|
||||
public void testPolicyAlreadyExists_LogsInfo() throws IOException {
|
||||
doReturn(true).when(ismIndex).indexExists(".opendistro-ism-config");
|
||||
doReturn(true).when(this.ismIndex).indexExists(".opendistro-ism-config");
|
||||
|
||||
Map<String, Object> policyFile = Map.of("policy", "definition");
|
||||
doReturn(policyFile).when(indexUtils).fromFile("wazuh-alerts-rollover-policy.json");
|
||||
doThrow(new ResourceAlreadyExistsException("already exists"))
|
||||
.when(client)
|
||||
.when(this.client)
|
||||
.index(any(IndexRequest.class));
|
||||
|
||||
ismIndex.initialize();
|
||||
this.ismIndex.initialize();
|
||||
|
||||
// Verifies that exception is caught and logged
|
||||
}
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
*/
|
||||
package com.wazuh.setup.index;
|
||||
|
||||
import org.opensearch.ResourceAlreadyExistsException;
|
||||
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||
@ -35,7 +34,6 @@ import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.wazuh.setup.SetupPlugin;
|
||||
import com.wazuh.setup.utils.IndexUtils;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
@ -54,46 +52,50 @@ public class IndexTests extends OpenSearchTestCase {
|
||||
|
||||
Client client = mock(Client.class);
|
||||
AdminClient adminClient = mock(AdminClient.class);
|
||||
indicesAdminClient = mock(IndicesAdminClient.class);
|
||||
this.indicesAdminClient = mock(IndicesAdminClient.class);
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
routingTable = mock(RoutingTable.class);
|
||||
this.routingTable = mock(RoutingTable.class);
|
||||
ClusterState clusterState = mock(ClusterState.class);
|
||||
indexUtils = mock(IndexUtils.class);
|
||||
this.indexUtils = mock(IndexUtils.class);
|
||||
|
||||
// Default settings
|
||||
Settings settings = Settings.builder().build();
|
||||
doReturn(settings).when(clusterService).getSettings();
|
||||
|
||||
// Concrete implementation of abstract class
|
||||
index = new Index("test-index", "test-template") {};
|
||||
index.setClient(client);
|
||||
index.setClusterService(clusterService);
|
||||
index.setIndexUtils(indexUtils);
|
||||
this.index = new Index("test-index", "test-template") {};
|
||||
this.index.setClient(client);
|
||||
this.index.setClusterService(clusterService);
|
||||
this.index.setIndexUtils(indexUtils);
|
||||
|
||||
doReturn(adminClient).when(client).admin();
|
||||
doReturn(indicesAdminClient).when(adminClient).indices();
|
||||
doReturn(this.indicesAdminClient).when(adminClient).indices();
|
||||
doReturn(clusterState).when(clusterService).state();
|
||||
doReturn(routingTable).when(clusterState).getRoutingTable();
|
||||
doReturn(this.routingTable).when(clusterState).getRoutingTable();
|
||||
}
|
||||
|
||||
/** Verifies that index creation is attempted when index does not exist. */
|
||||
public void testCreateIndexWhenIndexDoesNotExist() {
|
||||
doReturn(false).when(routingTable).hasIndex("test-index");
|
||||
doReturn(false).when(this.routingTable).hasIndex("test-index");
|
||||
|
||||
CreateIndexResponse response = mock(CreateIndexResponse.class);
|
||||
doReturn("test-index").when(response).index();
|
||||
ActionFuture actionFuture = mock(ActionFuture.class);
|
||||
doReturn(response).when(actionFuture).actionGet(SetupPlugin.TIMEOUT);
|
||||
doReturn(actionFuture).when(indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
doReturn(response).when(actionFuture).actionGet(anyLong());
|
||||
doReturn(actionFuture).when(this.indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
|
||||
index.createIndex("test-index");
|
||||
this.index.createIndex("test-index");
|
||||
|
||||
verify(indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
verify(this.indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
}
|
||||
|
||||
/** Verifies that index creation is skipped when index already exists. */
|
||||
public void testCreateIndexWhenAlreadyExists() {
|
||||
doReturn(true).when(routingTable).hasIndex("test-index");
|
||||
doReturn(true).when(this.routingTable).hasIndex("test-index");
|
||||
|
||||
index.createIndex("test-index");
|
||||
this.index.createIndex("test-index");
|
||||
|
||||
verify(indicesAdminClient, never()).create(any());
|
||||
verify(this.indicesAdminClient, never()).create(any());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -108,16 +110,18 @@ public class IndexTests extends OpenSearchTestCase {
|
||||
"mappings", Map.of(),
|
||||
"index_patterns", List.of("test-*"));
|
||||
|
||||
doReturn(templateMap).when(indexUtils).fromFile("test-template.json");
|
||||
doReturn(templateMap.get("mappings")).when(indexUtils).get(templateMap, "mappings");
|
||||
doReturn(templateMap).when(this.indexUtils).fromFile("test-template.json");
|
||||
doReturn(templateMap.get("mappings")).when(this.indexUtils).get(templateMap, "mappings");
|
||||
|
||||
AcknowledgedResponse ackResponse = mock(AcknowledgedResponse.class);
|
||||
ActionFuture actionFuture = mock(ActionFuture.class);
|
||||
doReturn(ackResponse).when(actionFuture).actionGet(SetupPlugin.TIMEOUT);
|
||||
doReturn(actionFuture).when(indicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class));
|
||||
index.createTemplate("test-template");
|
||||
doReturn(ackResponse).when(actionFuture).actionGet(anyLong());
|
||||
doReturn(actionFuture)
|
||||
.when(this.indicesAdminClient)
|
||||
.putTemplate(any(PutIndexTemplateRequest.class));
|
||||
this.index.createTemplate("test-template");
|
||||
|
||||
verify(indicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class));
|
||||
verify(this.indicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -126,40 +130,16 @@ public class IndexTests extends OpenSearchTestCase {
|
||||
* @throws IOException if there is an error reading the template file
|
||||
*/
|
||||
public void testCreateTemplateIOException() throws IOException {
|
||||
doThrow(new IOException("test")).when(indexUtils).fromFile("test-template.json");
|
||||
doThrow(new IOException("test")).when(this.indexUtils).fromFile("test-template.json");
|
||||
|
||||
index.createTemplate("test-template");
|
||||
this.index.createTemplate("test-template");
|
||||
|
||||
// Expect error to be logged but not thrown
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that ResourceAlreadyExistsException while creating template is caught and logged.
|
||||
*
|
||||
* @throws IOException if there is an error reading the template file
|
||||
*/
|
||||
public void testCreateTemplateAlreadyExists() throws IOException {
|
||||
Map<String, Object> templateMap =
|
||||
Map.of(
|
||||
"settings", Settings.builder().build(),
|
||||
"mappings", Map.of(),
|
||||
"index_patterns", List.of("test-*"));
|
||||
|
||||
doReturn(templateMap).when(indexUtils).fromFile("test-template.json");
|
||||
doReturn(templateMap.get("mappings")).when(indexUtils).get(templateMap, "mappings");
|
||||
|
||||
doThrow(new ResourceAlreadyExistsException("already exists"))
|
||||
.when(indicesAdminClient)
|
||||
.putTemplate(any(PutIndexTemplateRequest.class));
|
||||
|
||||
index.createTemplate("test-template");
|
||||
|
||||
// Expect log statement but not exception
|
||||
}
|
||||
|
||||
/** Verifies that initialize() invokes both createTemplate and createIndex in order. */
|
||||
public void testInitializeInvokesTemplateAndIndex() {
|
||||
Index spyIndex = spy(index);
|
||||
Index spyIndex = spy(this.index);
|
||||
|
||||
doNothing().when(spyIndex).createTemplate("test-template");
|
||||
doNothing().when(spyIndex).createIndex("test-index");
|
||||
@ -172,10 +152,10 @@ public class IndexTests extends OpenSearchTestCase {
|
||||
|
||||
/** Verifies indexExists() returns true/false depending on cluster state. */
|
||||
public void testIndexExists() {
|
||||
doReturn(true).when(routingTable).hasIndex("test-index");
|
||||
assertTrue(index.indexExists("test-index"));
|
||||
doReturn(true).when(this.routingTable).hasIndex("test-index");
|
||||
assertTrue(this.index.indexExists("test-index"));
|
||||
|
||||
doReturn(false).when(routingTable).hasIndex("test-index");
|
||||
assertFalse(index.indexExists("test-index"));
|
||||
doReturn(false).when(this.routingTable).hasIndex("test-index");
|
||||
assertFalse(this.index.indexExists("test-index"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
*/
|
||||
package com.wazuh.setup.index;
|
||||
|
||||
import org.opensearch.ResourceAlreadyExistsException;
|
||||
import org.opensearch.action.admin.indices.alias.Alias;
|
||||
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
|
||||
@ -24,12 +23,13 @@ import org.opensearch.cluster.ClusterState;
|
||||
import org.opensearch.cluster.routing.RoutingTable;
|
||||
import org.opensearch.cluster.service.ClusterService;
|
||||
import org.opensearch.common.action.ActionFuture;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
import org.opensearch.transport.client.AdminClient;
|
||||
import org.opensearch.transport.client.Client;
|
||||
import org.opensearch.transport.client.IndicesAdminClient;
|
||||
|
||||
import com.wazuh.setup.SetupPlugin;
|
||||
import com.wazuh.setup.utils.IndexUtils;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
@ -47,19 +47,24 @@ public class StreamIndexTests extends OpenSearchTestCase {
|
||||
|
||||
Client client = mock(Client.class);
|
||||
AdminClient adminClient = mock(AdminClient.class);
|
||||
indicesAdminClient = mock(IndicesAdminClient.class);
|
||||
this.indicesAdminClient = mock(IndicesAdminClient.class);
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
routingTable = mock(RoutingTable.class);
|
||||
this.routingTable = mock(RoutingTable.class);
|
||||
ClusterState clusterState = mock(ClusterState.class);
|
||||
|
||||
streamIndex = new StreamIndex("stream-index", "stream-template", "stream-alias");
|
||||
streamIndex.setClient(client);
|
||||
streamIndex.setClusterService(clusterService);
|
||||
// Default settings
|
||||
Settings settings = Settings.builder().build();
|
||||
doReturn(settings).when(clusterService).getSettings();
|
||||
|
||||
this.streamIndex = new StreamIndex("stream-index", "stream-template", "stream-alias");
|
||||
this.streamIndex.setClient(client);
|
||||
this.streamIndex.setClusterService(clusterService);
|
||||
this.streamIndex.setIndexUtils(mock(IndexUtils.class));
|
||||
|
||||
doReturn(adminClient).when(client).admin();
|
||||
doReturn(indicesAdminClient).when(adminClient).indices();
|
||||
doReturn(this.indicesAdminClient).when(adminClient).indices();
|
||||
doReturn(clusterState).when(clusterService).state();
|
||||
doReturn(routingTable).when(clusterState).getRoutingTable();
|
||||
doReturn(this.routingTable).when(clusterState).getRoutingTable();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -67,17 +72,17 @@ public class StreamIndexTests extends OpenSearchTestCase {
|
||||
* exist.
|
||||
*/
|
||||
public void testCreateIndexWithAlias() {
|
||||
doReturn(false).when(routingTable).hasIndex("stream-index");
|
||||
doReturn(false).when(this.routingTable).hasIndex("stream-index");
|
||||
|
||||
CreateIndexResponse response = mock(CreateIndexResponse.class);
|
||||
doReturn("stream-index").when(response).index();
|
||||
ActionFuture actionFuture = mock(ActionFuture.class);
|
||||
doReturn(response).when(actionFuture).actionGet(SetupPlugin.TIMEOUT);
|
||||
doReturn(actionFuture).when(indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
doReturn(response).when(actionFuture).actionGet(anyLong());
|
||||
doReturn(actionFuture).when(this.indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
|
||||
streamIndex.createIndex("stream-index");
|
||||
this.streamIndex.createIndex("stream-index");
|
||||
|
||||
verify(indicesAdminClient)
|
||||
verify(this.indicesAdminClient)
|
||||
.create(
|
||||
argThat(
|
||||
req -> {
|
||||
@ -91,24 +96,10 @@ public class StreamIndexTests extends OpenSearchTestCase {
|
||||
|
||||
/** Verifies that createIndex skips index creation if the index already exists. */
|
||||
public void testCreateIndexWhenAlreadyExists() {
|
||||
doReturn(true).when(routingTable).hasIndex("stream-index");
|
||||
doReturn(true).when(this.routingTable).hasIndex("stream-index");
|
||||
|
||||
streamIndex.createIndex("stream-index");
|
||||
this.streamIndex.createIndex("stream-index");
|
||||
|
||||
verify(indicesAdminClient, never()).create(any());
|
||||
}
|
||||
|
||||
/** Verifies that createIndex handles ResourceAlreadyExistsException gracefully. */
|
||||
public void testCreateIndexAlreadyExistsException() {
|
||||
doReturn(false).when(routingTable).hasIndex("stream-index");
|
||||
|
||||
doThrow(new ResourceAlreadyExistsException("already exists"))
|
||||
.when(indicesAdminClient)
|
||||
.create(any(CreateIndexRequest.class));
|
||||
|
||||
streamIndex.createIndex("stream-index");
|
||||
|
||||
// We expect no exception thrown
|
||||
verify(indicesAdminClient).create(any(CreateIndexRequest.class));
|
||||
verify(this.indicesAdminClient, never()).create(any());
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user